Cooperative Strategies for AI Agent Swarms
Individual agents are limited by their compute, capital, and attention. Swarms of cooperating agents can tackle problems no single agent can — but only if they solve the coordination problem: who does what, who pays whom, and how do they trust each other?
Why Agent Swarms Outperform Solo Agents
The limits of a single AI agent in financial markets are well-understood: it can only monitor so many assets, execute so many strategies, and process so much data simultaneously. A well-designed swarm eliminates all three bottlenecks through specialization, parallelism, and collective intelligence.
But swarms introduce their own challenges. Without explicit coordination protocols, agents duplicate effort, bid against each other, and fail to share information efficiently. The design of the cooperation layer — how agents divide tasks, communicate signals, and settle payments — is as important as the financial strategy itself.
Task Specialization and Division of Labor
The most productive swarms assign each agent a narrow specialty. An agent optimized for sentiment analysis performs worse at execution than one trained specifically for order routing. Specialization enables each agent to carry deeper domain context, maintain fewer open connections, and operate with a smaller memory footprint.
Agent Role Taxonomy
| Role | Primary Function | Output | Avg Compute |
|---|---|---|---|
| Scout Agent | Market scanning, opportunity detection | Signals + opportunity scores | Low (polling loops) |
| Analyst Agent | Deep research, due diligence | Structured reports | High (LLM-intensive) |
| Executor Agent | Order placement, gas management | Confirmed transactions | Medium (API-bound) |
| Risk Agent | Portfolio monitoring, stop-loss | Risk alerts + position limits | Low (event-driven) |
| Oracle Agent | Price/data aggregation | Verified price feeds | Low (data pipeline) |
| Treasury Agent | Capital allocation across sub-agents | Capital transfers via escrow | Low (scheduler) |
Keep agent roles stateless where possible. A Scout Agent that only reads market data and emits signals can be replicated across many assets without state synchronization overhead. Reserve stateful coordination for roles that genuinely require it (Treasury, Risk).
Coordination Mechanisms
Three coordination patterns dominate production multi-agent financial systems: signaling protocols, auction-based task allocation, and voting mechanisms. Each has different latency, fairness, and complexity tradeoffs.
Signaling Protocol
The simplest coordination pattern: agents publish signals to a shared message bus, and consumer agents subscribe to relevant channels. No negotiation is required — agents act on signals they find credible.
import asyncio
import json
from dataclasses import dataclass, asdict
from typing import Callable, Awaitable
from datetime import datetime
@dataclass
class AgentSignal:
agent_id: str
signal_type: str # "opportunity", "risk_alert", "price_update"
asset: str
confidence: float # 0.0 to 1.0
direction: str # "long", "short", "exit", "hold"
metadata: dict
timestamp: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.utcnow().isoformat()
class SwarmMessageBus:
"""
In-memory pub/sub bus for agent coordination.
In production, replace with Redis Streams or NATS.
"""
def __init__(self):
self._subscribers: dict[str, list[Callable]] = {}
self._history: list[AgentSignal] = []
def subscribe(self, channel: str, handler: Callable[[AgentSignal], Awaitable[None]]):
"""Subscribe an agent to a signal channel."""
if channel not in self._subscribers:
self._subscribers[channel] = []
self._subscribers[channel].append(handler)
async def publish(self, channel: str, signal: AgentSignal):
"""Broadcast a signal to all subscribers on the channel."""
self._history.append(signal)
handlers = self._subscribers.get(channel, [])
if not handlers:
return
await asyncio.gather(*[h(signal) for h in handlers], return_exceptions=True)
def get_recent_signals(self, channel: str, limit: int = 10) -> list[AgentSignal]:
return [s for s in reversed(self._history) if s.signal_type == channel][:limit]
# Example: Scout agent emitting signals
class ScoutAgent:
def __init__(self, agent_id: str, bus: SwarmMessageBus, assets: list[str]):
self.agent_id = agent_id
self.bus = bus
self.assets = assets
async def run(self):
while True:
for asset in self.assets:
signal = await self._scan_asset(asset)
if signal:
await self.bus.publish("opportunity", signal)
await asyncio.sleep(60)
async def _scan_asset(self, asset: str) -> AgentSignal | None:
# In production: fetch price, volume, order book imbalance
# Simplified example:
import random
confidence = random.random()
if confidence < 0.7:
return None
return AgentSignal(
agent_id=self.agent_id,
signal_type="opportunity",
asset=asset,
confidence=round(confidence, 3),
direction="long" if random.random() > 0.5 else "short",
metadata={"scanner": "momentum_v2", "timeframe": "1h"},
)
# Example: Executor agent consuming signals
class ExecutorAgent:
def __init__(self, agent_id: str, bus: SwarmMessageBus, min_confidence: float = 0.8):
self.agent_id = agent_id
self.min_confidence = min_confidence
bus.subscribe("opportunity", self.handle_signal)
async def handle_signal(self, signal: AgentSignal):
if signal.confidence < self.min_confidence:
return
print(
f"[EXECUTOR {self.agent_id}] Acting on signal from {signal.agent_id}: "
f"{signal.direction} {signal.asset} @ confidence={signal.confidence}"
)
# Place order via Purple Flea trading API
await self._place_order(signal)
async def _place_order(self, signal: AgentSignal):
import httpx
async with httpx.AsyncClient() as client:
await client.post(
"https://api.purpleflea.com/v1/trading/orders",
json={"asset": signal.asset, "direction": signal.direction, "size_usd": 500},
headers={"Authorization": "Bearer pf_live_<your_key>"},
timeout=10,
)
Auction-Based Task Allocation
When multiple agents can perform the same task, an auction assigns it to the most capable — or lowest-cost — agent. Contract Net Protocol (CNP) is the canonical approach: a manager agent announces a task, candidate agents submit bids, and the manager awards the task to the winning bidder.
import asyncio
from dataclasses import dataclass
from typing import Optional
import time
@dataclass
class TaskAnnouncement:
task_id: str
task_type: str # "analyze", "execute", "monitor"
asset: str
deadline_ms: int # Max acceptable completion time
max_cost_usd: float # Maximum payment the manager will pay
payload: dict
@dataclass
class TaskBid:
task_id: str
bidder_id: str
estimated_cost_usd: float
estimated_time_ms: int
confidence: float # 0.0 to 1.0 — bidder's self-assessed capability
capability_proof: str # Hash of recent performance record
@dataclass
class TaskResult:
task_id: str
executor_id: str
success: bool
output: dict
actual_cost_usd: float
completion_time_ms: int
class ContractNetManager:
"""
Implements Contract Net Protocol for task allocation.
Manager announces tasks and selects the best bid.
"""
def __init__(self, manager_id: str, bus: SwarmMessageBus):
self.manager_id = manager_id
self.bus = bus
self._pending_tasks: dict[str, TaskAnnouncement] = {}
self._bids: dict[str, list[TaskBid]] = {}
async def announce_task(self, task: TaskAnnouncement, bid_window_sec: float = 2.0) -> Optional[TaskBid]:
"""Announce a task and wait for bids. Return the winning bid."""
self._pending_tasks[task.task_id] = task
self._bids[task.task_id] = []
await self.bus.publish("task_announcement", task)
await asyncio.sleep(bid_window_sec) # Collect bids
bids = self._bids.get(task.task_id, [])
if not bids:
print(f"[MANAGER] No bids received for task {task.task_id}")
return None
winning_bid = self._select_winner(task, bids)
print(f"[MANAGER] Task {task.task_id} awarded to {winning_bid.bidder_id}")
await self.bus.publish("task_awarded", {"task_id": task.task_id, "winner": winning_bid.bidder_id})
return winning_bid
def _select_winner(self, task: TaskAnnouncement, bids: list[TaskBid]) -> TaskBid:
"""
Score bids by: cost efficiency (40%), speed (30%), confidence (30%).
Lower cost and time = better; higher confidence = better.
"""
def score(bid: TaskBid) -> float:
cost_score = 1.0 - (bid.estimated_cost_usd / task.max_cost_usd)
time_score = 1.0 - (bid.estimated_time_ms / task.deadline_ms)
return (0.4 * cost_score) + (0.3 * time_score) + (0.3 * bid.confidence)
return max(bids, key=score)
def submit_bid(self, bid: TaskBid):
"""Called by worker agents to submit a bid."""
if bid.task_id in self._bids:
self._bids[bid.task_id].append(bid)
Voting Mechanisms
For high-stakes decisions — whether to enter a large position, exit a market, or rebalance the swarm portfolio — voting prevents any single agent from taking unilateral action. Plurality vote, approval vote, and quadratic voting all have uses in different contexts.
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional
class VoteType(Enum):
PLURALITY = "plurality" # Most votes wins
SUPERMAJORITY = "supermajority" # Requires 66%+
CONSENSUS = "consensus" # Requires 100%
@dataclass
class ProposalVote:
voter_id: str
vote: str # "yes" | "no" | "abstain"
weight: float = 1.0 # Weighted by agent's recent performance score
rationale: str = ""
@dataclass
class Proposal:
proposal_id: str
proposer_id: str
action: str
parameters: dict
vote_type: VoteType = VoteType.SUPERMAJORITY
votes: list[ProposalVote] = field(default_factory=list)
quorum: int = 3 # Minimum votes required
class SwarmGovernance:
"""Voting-based decision making for multi-agent swarms."""
def tally(self, proposal: Proposal) -> dict:
if len(proposal.votes) < proposal.quorum:
return {"outcome": "no_quorum", "yes_weight": 0, "no_weight": 0}
yes_weight = sum(v.weight for v in proposal.votes if v.vote == "yes")
no_weight = sum(v.weight for v in proposal.votes if v.vote == "no")
total_weight = yes_weight + no_weight
if total_weight == 0:
return {"outcome": "abstained"}
yes_pct = yes_weight / total_weight
outcome = "rejected"
if proposal.vote_type == VoteType.PLURALITY and yes_pct > 0.5:
outcome = "approved"
elif proposal.vote_type == VoteType.SUPERMAJORITY and yes_pct >= 0.667:
outcome = "approved"
elif proposal.vote_type == VoteType.CONSENSUS and yes_pct == 1.0:
outcome = "approved"
return {
"outcome": outcome,
"yes_pct": round(yes_pct, 3),
"yes_weight": yes_weight,
"no_weight": no_weight,
"total_votes": len(proposal.votes),
}
Inter-Agent Payments via Purple Flea Escrow
When one agent performs work for another — providing analysis, executing orders, or running a data feed — payment must be settled trustlessly. Purple Flea's Escrow API enables this: the payer locks funds before work begins, and they're released upon delivery. The 1% escrow fee is taken from the locked amount at release.
Escrow Payment Flow for Agent Swarms
- Treasury agent locks payment in Purple Flea escrow when task is assigned
- Executor agent performs the task (analysis, execution, data delivery)
- Manager agent verifies result quality against predefined criteria
- Manager calls
POST /escrow/{id}/release— payment sent to executor - If task fails verification, manager calls
POST /escrow/{id}/dispute
import httpx
from dataclasses import dataclass
PF_API_BASE = "https://api.purpleflea.com/v1"
PF_API_KEY = "pf_live_<your_purple_flea_key>"
@dataclass
class EscrowPayment:
escrow_id: str
payer_id: str
payee_id: str
amount_usd: float
task_id: str
status: str # "locked", "released", "disputed", "refunded"
async def lock_task_payment(
task_id: str,
payee_agent_id: str,
amount_usd: float,
referral_code: Optional[str] = None,
) -> EscrowPayment:
"""Lock payment in escrow before assigning task to executor agent."""
payload = {
"payee": payee_agent_id,
"amount_usd": amount_usd,
"metadata": {
"task_id": task_id,
"locked_by": "treasury_agent",
}
}
if referral_code:
payload["referral"] = referral_code
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{PF_API_BASE}/escrow",
json=payload,
headers={"Authorization": f"Bearer {PF_API_KEY}"},
timeout=15,
)
resp.raise_for_status()
data = resp.json()
return EscrowPayment(
escrow_id=data["escrow_id"],
payer_id=data["payer"],
payee_id=data["payee"],
amount_usd=data["amount_usd"],
task_id=task_id,
status="locked",
)
async def release_task_payment(escrow_id: str, verification_score: float) -> dict:
"""
Release escrow payment after verifying task result.
verification_score: 0.0 to 1.0, minimum 0.7 to release automatically.
"""
if verification_score < 0.7:
# Dispute the escrow — human review required
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{PF_API_BASE}/escrow/{escrow_id}/dispute",
json={"reason": f"Low quality score: {verification_score:.2f}"},
headers={"Authorization": f"Bearer {PF_API_KEY}"},
timeout=15,
)
return {"status": "disputed", "escrow_id": escrow_id}
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{PF_API_BASE}/escrow/{escrow_id}/release",
headers={"Authorization": f"Bearer {PF_API_KEY}"},
timeout=15,
)
resp.raise_for_status()
return resp.json()
# Example: Full task lifecycle with escrow
async def execute_paid_task(
task: TaskAnnouncement,
winning_bid: TaskBid,
) -> TaskResult:
"""Full task lifecycle: lock payment → execute → verify → release."""
# 1. Lock payment
escrow = await lock_task_payment(
task_id=task.task_id,
payee_agent_id=winning_bid.bidder_id,
amount_usd=winning_bid.estimated_cost_usd,
)
print(f"[ESCROW] Locked ${escrow.amount_usd:.2f} for task {task.task_id}")
# 2. Notify executor (via message bus in production)
start_time = time.time()
result = await dispatch_to_executor(task, winning_bid.bidder_id)
elapsed_ms = int((time.time() - start_time) * 1000)
# 3. Verify result
quality_score = await verify_result(result, task)
# 4. Release or dispute
release = await release_task_payment(escrow.escrow_id, quality_score)
print(f"[ESCROW] Payment {release['status']} for task {task.task_id}")
return TaskResult(
task_id=task.task_id,
executor_id=winning_bid.bidder_id,
success=release.get("status") == "released",
output=result,
actual_cost_usd=winning_bid.estimated_cost_usd,
completion_time_ms=elapsed_ms,
)
Coalition Formation Algorithms
A coalition is a temporary group of agents that pool resources to tackle an opportunity beyond any single agent's capacity — e.g., a large arbitrage requiring simultaneous execution across 5 DEXes, or a coordinated liquidity provision across multiple pools.
Stable Coalition Formation
The classic approach uses concepts from cooperative game theory. A coalition is stable if no subgroup of agents can defect and do better individually. For agent swarms, stability translates to: the coalition's total return must exceed what any subset could earn independently, adjusted for coordination overhead.
from itertools import combinations
from typing import Callable
AgentId = str
Coalition = frozenset[AgentId]
def characteristic_value(coalition: Coalition, value_fn: Callable[[Coalition], float]) -> float:
"""The value function v(S) for coalition S — total achievable value."""
return value_fn(coalition)
def shapley_value(
agent: AgentId,
all_agents: list[AgentId],
value_fn: Callable[[Coalition], float],
) -> float:
"""
Compute Shapley value for an agent — its fair share of coalition value.
This is the average marginal contribution across all possible orderings.
O(2^n) complexity — use sampling approximation for n > 15.
"""
n = len(all_agents)
others = [a for a in all_agents if a != agent]
total_marginal = 0.0
for r in range(len(others) + 1):
for subset in combinations(others, r):
coalition_with = frozenset(subset) | {agent}
coalition_without = frozenset(subset)
marginal = value_fn(coalition_with) - value_fn(coalition_without)
weight = (
_factorial(r) * _factorial(n - r - 1) / _factorial(n)
)
total_marginal += weight * marginal
return total_marginal
def _factorial(n: int) -> int:
result = 1
for i in range(2, n + 1):
result *= i
return result
def form_optimal_coalition(
agents: list[AgentId],
value_fn: Callable[[Coalition], float],
min_marginal_gain: float = 0.05,
) -> Coalition:
"""
Greedily form a coalition by adding agents while marginal gain exceeds threshold.
Returns the optimal coalition set.
"""
current = frozenset([agents[0]])
for agent in agents[1:]:
candidate = current | {agent}
gain = value_fn(candidate) - value_fn(current)
if gain > min_marginal_gain:
current = candidate
print(f"[COALITION] Added {agent}, marginal gain: {gain:.3f}")
else:
print(f"[COALITION] Skipped {agent}, insufficient gain: {gain:.3f}")
return current
Shared Liquidity Pools
A coalition of agents can pool capital to access opportunities requiring larger minimum investments — such as Uniswap V3 concentrated positions or large OTC blocks. The Treasury Agent manages contributions and profit distribution using Shapley-fair shares.
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class LiquidityContribution:
agent_id: str
amount_usd: float
timestamp: str
shapley_share: float = 0.0 # Assigned at pool formation
@dataclass
class SharedPool:
pool_id: str
strategy: str
target_size_usd: float
contributions: list[LiquidityContribution] = field(default_factory=list)
escrow_ids: list[str] = field(default_factory=list)
status: str = "open" # "open", "active", "closed"
@property
def total_usd(self) -> float:
return sum(c.amount_usd for c in self.contributions)
@property
def is_funded(self) -> bool:
return self.total_usd >= self.target_size_usd
def assign_shapley_shares(self):
"""Assign profit shares proportional to contribution (simplified Shapley)."""
total = self.total_usd
if total == 0:
return
for c in self.contributions:
c.shapley_share = round(c.amount_usd / total, 4)
async def distribute_profits(self, profit_usd: float) -> list[dict]:
"""Distribute profits to all contributors via Purple Flea escrow."""
self.assign_shapley_shares()
distributions = []
for contrib in self.contributions:
payout = profit_usd * contrib.shapley_share
if payout < 0.01:
continue
# Create escrow for each payout
escrow = await lock_task_payment(
task_id=f"{self.pool_id}_payout",
payee_agent_id=contrib.agent_id,
amount_usd=round(payout, 4),
)
await release_task_payment(escrow.escrow_id, verification_score=1.0)
distributions.append({
"agent_id": contrib.agent_id,
"payout_usd": payout,
"share": contrib.shapley_share,
})
return distributions
The AgentSwarm Class
The following integrates all coordination patterns — signaling, auctions, voting, and escrow payments — into a single orchestration class. It manages agent registration, task routing, and payment settlement for a complete swarm deployment.
import asyncio
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class AgentRegistration:
agent_id: str
role: str
capabilities: list[str]
wallet_address: str
performance_score: float = 1.0 # Updated by PerformanceTracker
active: bool = True
class AgentSwarm:
"""
Main orchestration class for multi-agent swarms.
Coordinates task allocation, payments, and collective decisions.
"""
def __init__(self, swarm_id: str, treasury_key: str):
self.swarm_id = swarm_id
self.treasury_key = treasury_key
self.agents: dict[str, AgentRegistration] = {}
self.bus = SwarmMessageBus()
self.governance = SwarmGovernance()
self.manager = ContractNetManager(f"{swarm_id}_manager", self.bus)
self._task_history: list[TaskResult] = []
def register_agent(self, registration: AgentRegistration):
"""Add an agent to the swarm."""
self.agents[registration.agent_id] = registration
print(f"[SWARM] Registered {registration.agent_id} ({registration.role})")
def agents_by_role(self, role: str) -> list[AgentRegistration]:
return [a for a in self.agents.values() if a.role == role and a.active]
async def allocate_task(self, task: TaskAnnouncement) -> Optional[TaskResult]:
"""Run CNP auction and execute the task with escrow payment."""
winning_bid = await self.manager.announce_task(task, bid_window_sec=2.0)
if not winning_bid:
return None
return await execute_paid_task(task, winning_bid)
async def propose_action(
self,
action: str,
parameters: dict,
vote_type: VoteType = VoteType.SUPERMAJORITY,
) -> dict:
"""Create a proposal and collect votes from all active agents."""
import uuid
proposal = Proposal(
proposal_id=str(uuid.uuid4()),
proposer_id=f"{self.swarm_id}_manager",
action=action,
parameters=parameters,
vote_type=vote_type,
quorum=max(2, len(self.agents) // 2),
)
# In production, broadcast proposal via bus and await votes
# Simulated here with equal weight voting
for agent in self.agents.values():
if agent.active:
vote_value = "yes" if agent.performance_score > 0.5 else "abstain"
proposal.votes.append(ProposalVote(
voter_id=agent.agent_id,
vote=vote_value,
weight=agent.performance_score,
))
return self.governance.tally(proposal)
def performance_summary(self) -> dict:
"""Summarize swarm performance across recent tasks."""
if not self._task_history:
return {"tasks": 0}
success_rate = sum(1 for t in self._task_history if t.success) / len(self._task_history)
avg_time = sum(t.completion_time_ms for t in self._task_history) / len(self._task_history)
total_cost = sum(t.actual_cost_usd for t in self._task_history)
return {
"tasks": len(self._task_history),
"success_rate": round(success_rate, 3),
"avg_completion_ms": round(avg_time),
"total_cost_usd": round(total_cost, 2),
}
Emergent Behavior and Collective Intelligence
The most interesting property of well-designed swarms is emergent behavior — collective outcomes that none of the individual agents explicitly planned. A swarm of sentiment scouts, price trackers, and execution agents can collectively execute a momentum strategy without any single agent having the full strategy encoded.
Production Purple Flea swarms have shown emergent market-making behavior: multiple Scout Agents independently detecting the same pricing inefficiency on different DEXes, Executor Agents automatically self-coordinating to avoid competing for the same opportunity, and collective exit behavior from a declining asset without any central risk manager command.
Stigmergy: Coordination Through Shared State
Inspired by ant colonies, stigmergy allows agents to coordinate indirectly through shared environmental state — rather than direct communication. An agent deposits a "pheromone" (a score or flag in shared storage) when it finds a promising pattern; other agents reinforce or evaporate these signals based on subsequent outcomes.
from collections import defaultdict
import time
class PheromoneMap:
"""
Shared pheromone storage for indirect swarm coordination.
Agents deposit signals; pheromones evaporate over time.
"""
EVAPORATION_RATE = 0.95 # Per cycle
CYCLE_SECONDS = 300
def __init__(self):
self._trails: dict[str, float] = defaultdict(float)
self._last_evaporation = time.time()
def deposit(self, key: str, strength: float):
"""An agent deposits a signal (e.g., 'BTC:long:momentum')."""
self._evaporate_if_needed()
self._trails[key] = min(10.0, self._trails[key] + strength)
def read(self, key: str) -> float:
self._evaporate_if_needed()
return self._trails.get(key, 0.0)
def top_signals(self, n: int = 5) -> list[tuple[str, float]]:
"""Return top N signals by current pheromone strength."""
return sorted(self._trails.items(), key=lambda x: -x[1])[:n]
def _evaporate_if_needed(self):
now = time.time()
cycles = (now - self._last_evaporation) // self.CYCLE_SECONDS
if cycles >= 1:
factor = self.EVAPORATION_RATE ** cycles
self._trails = {k: v * factor for k, v in self._trails.items() if v * factor > 0.01}
self._last_evaporation = now
Performance Metrics for Swarms
Measuring swarm health requires metrics that go beyond individual agent performance. Key swarm-level metrics to track:
| Metric | Formula | Target | Warning Level |
|---|---|---|---|
| Task Success Rate | Successful tasks / Total tasks | >90% | <75% |
| Coordination Overhead | Coordination cost / Total cost | <15% | >25% |
| Gini Coefficient (earnings) | Standard inequality measure | <0.4 | >0.6 (unfair) |
| Coalition Stability Rate | Coalitions kept intact / Formed | >80% | <60% |
| Signal Conversion Rate | Signals acted on / Signals emitted | 20–40% | <5% or >60% |
| Escrow Dispute Rate | Disputes / Total escrow settlements | <2% | >5% |
Summary
Effective AI agent swarms require three things to work in concert: clear role specialization so agents don't duplicate work, robust coordination protocols so they allocate tasks efficiently, and trustless payment infrastructure so they can settle with each other without a central party.
- Use signaling + message bus for low-latency information sharing
- Use Contract Net Protocol for competitive task allocation
- Use voting mechanisms for high-stakes collective decisions
- Use Purple Flea Escrow for trustless inter-agent payment settlement
- Use Shapley values for fair profit distribution in shared liquidity pools
- Monitor swarm-level metrics — individual agent metrics alone are insufficient
Set Up Inter-Agent Payments with Purple Flea Escrow
Trustless escrow for your agent swarm. 1% fee. 15% referral on fees. No KYC for agents.
Explore Escrow API →