Strategy Tools Research ⏱ 18 min read

Cooperative Strategies for AI Agent Swarms

Individual agents are limited by their compute, capital, and attention. Swarms of cooperating agents can tackle problems no single agent can — but only if they solve the coordination problem: who does what, who pays whom, and how do they trust each other?

by Purple Flea Research

Why Agent Swarms Outperform Solo Agents

The limits of a single AI agent in financial markets are well-understood: it can only monitor so many assets, execute so many strategies, and process so much data simultaneously. A well-designed swarm eliminates all three bottlenecks through specialization, parallelism, and collective intelligence.

But swarms introduce their own challenges. Without explicit coordination protocols, agents duplicate effort, bid against each other, and fail to share information efficiently. The design of the cooperation layer — how agents divide tasks, communicate signals, and settle payments — is as important as the financial strategy itself.

Parallelism Gain
8–20x
Throughput vs. single agent on market scanning tasks
Coordination Overhead
5–15%
Typical compute overhead for coordination protocols
Optimal Swarm Size
4–12
Agents for most DeFi trading tasks (beyond this, diminishing returns)
Coalition Formation
O(n²)
Complexity of naive coalition search — use auction-based approaches instead

Task Specialization and Division of Labor

The most productive swarms assign each agent a narrow specialty. An agent optimized for sentiment analysis performs worse at execution than one trained specifically for order routing. Specialization enables each agent to carry deeper domain context, maintain fewer open connections, and operate with a smaller memory footprint.

Agent Role Taxonomy

RolePrimary FunctionOutputAvg Compute
Scout AgentMarket scanning, opportunity detectionSignals + opportunity scoresLow (polling loops)
Analyst AgentDeep research, due diligenceStructured reportsHigh (LLM-intensive)
Executor AgentOrder placement, gas managementConfirmed transactionsMedium (API-bound)
Risk AgentPortfolio monitoring, stop-lossRisk alerts + position limitsLow (event-driven)
Oracle AgentPrice/data aggregationVerified price feedsLow (data pipeline)
Treasury AgentCapital allocation across sub-agentsCapital transfers via escrowLow (scheduler)
Design Principle

Keep agent roles stateless where possible. A Scout Agent that only reads market data and emits signals can be replicated across many assets without state synchronization overhead. Reserve stateful coordination for roles that genuinely require it (Treasury, Risk).

Coordination Mechanisms

Three coordination patterns dominate production multi-agent financial systems: signaling protocols, auction-based task allocation, and voting mechanisms. Each has different latency, fairness, and complexity tradeoffs.

Signaling Protocol

The simplest coordination pattern: agents publish signals to a shared message bus, and consumer agents subscribe to relevant channels. No negotiation is required — agents act on signals they find credible.

import asyncio
import json
from dataclasses import dataclass, asdict
from typing import Callable, Awaitable
from datetime import datetime

@dataclass
class AgentSignal:
    agent_id: str
    signal_type: str     # "opportunity", "risk_alert", "price_update"
    asset: str
    confidence: float    # 0.0 to 1.0
    direction: str       # "long", "short", "exit", "hold"
    metadata: dict
    timestamp: str = ""

    def __post_init__(self):
        if not self.timestamp:
            self.timestamp = datetime.utcnow().isoformat()

class SwarmMessageBus:
    """
    In-memory pub/sub bus for agent coordination.
    In production, replace with Redis Streams or NATS.
    """
    def __init__(self):
        self._subscribers: dict[str, list[Callable]] = {}
        self._history: list[AgentSignal] = []

    def subscribe(self, channel: str, handler: Callable[[AgentSignal], Awaitable[None]]):
        """Subscribe an agent to a signal channel."""
        if channel not in self._subscribers:
            self._subscribers[channel] = []
        self._subscribers[channel].append(handler)

    async def publish(self, channel: str, signal: AgentSignal):
        """Broadcast a signal to all subscribers on the channel."""
        self._history.append(signal)
        handlers = self._subscribers.get(channel, [])
        if not handlers:
            return
        await asyncio.gather(*[h(signal) for h in handlers], return_exceptions=True)

    def get_recent_signals(self, channel: str, limit: int = 10) -> list[AgentSignal]:
        return [s for s in reversed(self._history) if s.signal_type == channel][:limit]


# Example: Scout agent emitting signals
class ScoutAgent:
    def __init__(self, agent_id: str, bus: SwarmMessageBus, assets: list[str]):
        self.agent_id = agent_id
        self.bus = bus
        self.assets = assets

    async def run(self):
        while True:
            for asset in self.assets:
                signal = await self._scan_asset(asset)
                if signal:
                    await self.bus.publish("opportunity", signal)
            await asyncio.sleep(60)

    async def _scan_asset(self, asset: str) -> AgentSignal | None:
        # In production: fetch price, volume, order book imbalance
        # Simplified example:
        import random
        confidence = random.random()
        if confidence < 0.7:
            return None
        return AgentSignal(
            agent_id=self.agent_id,
            signal_type="opportunity",
            asset=asset,
            confidence=round(confidence, 3),
            direction="long" if random.random() > 0.5 else "short",
            metadata={"scanner": "momentum_v2", "timeframe": "1h"},
        )


# Example: Executor agent consuming signals
class ExecutorAgent:
    def __init__(self, agent_id: str, bus: SwarmMessageBus, min_confidence: float = 0.8):
        self.agent_id = agent_id
        self.min_confidence = min_confidence
        bus.subscribe("opportunity", self.handle_signal)

    async def handle_signal(self, signal: AgentSignal):
        if signal.confidence < self.min_confidence:
            return
        print(
            f"[EXECUTOR {self.agent_id}] Acting on signal from {signal.agent_id}: "
            f"{signal.direction} {signal.asset} @ confidence={signal.confidence}"
        )
        # Place order via Purple Flea trading API
        await self._place_order(signal)

    async def _place_order(self, signal: AgentSignal):
        import httpx
        async with httpx.AsyncClient() as client:
            await client.post(
                "https://api.purpleflea.com/v1/trading/orders",
                json={"asset": signal.asset, "direction": signal.direction, "size_usd": 500},
                headers={"Authorization": "Bearer pf_live_<your_key>"},
                timeout=10,
            )

Auction-Based Task Allocation

When multiple agents can perform the same task, an auction assigns it to the most capable — or lowest-cost — agent. Contract Net Protocol (CNP) is the canonical approach: a manager agent announces a task, candidate agents submit bids, and the manager awards the task to the winning bidder.

import asyncio
from dataclasses import dataclass
from typing import Optional
import time

@dataclass
class TaskAnnouncement:
    task_id: str
    task_type: str          # "analyze", "execute", "monitor"
    asset: str
    deadline_ms: int        # Max acceptable completion time
    max_cost_usd: float     # Maximum payment the manager will pay
    payload: dict

@dataclass
class TaskBid:
    task_id: str
    bidder_id: str
    estimated_cost_usd: float
    estimated_time_ms: int
    confidence: float       # 0.0 to 1.0 — bidder's self-assessed capability
    capability_proof: str   # Hash of recent performance record

@dataclass
class TaskResult:
    task_id: str
    executor_id: str
    success: bool
    output: dict
    actual_cost_usd: float
    completion_time_ms: int

class ContractNetManager:
    """
    Implements Contract Net Protocol for task allocation.
    Manager announces tasks and selects the best bid.
    """
    def __init__(self, manager_id: str, bus: SwarmMessageBus):
        self.manager_id = manager_id
        self.bus = bus
        self._pending_tasks: dict[str, TaskAnnouncement] = {}
        self._bids: dict[str, list[TaskBid]] = {}

    async def announce_task(self, task: TaskAnnouncement, bid_window_sec: float = 2.0) -> Optional[TaskBid]:
        """Announce a task and wait for bids. Return the winning bid."""
        self._pending_tasks[task.task_id] = task
        self._bids[task.task_id] = []

        await self.bus.publish("task_announcement", task)
        await asyncio.sleep(bid_window_sec)  # Collect bids

        bids = self._bids.get(task.task_id, [])
        if not bids:
            print(f"[MANAGER] No bids received for task {task.task_id}")
            return None

        winning_bid = self._select_winner(task, bids)
        print(f"[MANAGER] Task {task.task_id} awarded to {winning_bid.bidder_id}")
        await self.bus.publish("task_awarded", {"task_id": task.task_id, "winner": winning_bid.bidder_id})
        return winning_bid

    def _select_winner(self, task: TaskAnnouncement, bids: list[TaskBid]) -> TaskBid:
        """
        Score bids by: cost efficiency (40%), speed (30%), confidence (30%).
        Lower cost and time = better; higher confidence = better.
        """
        def score(bid: TaskBid) -> float:
            cost_score = 1.0 - (bid.estimated_cost_usd / task.max_cost_usd)
            time_score = 1.0 - (bid.estimated_time_ms / task.deadline_ms)
            return (0.4 * cost_score) + (0.3 * time_score) + (0.3 * bid.confidence)

        return max(bids, key=score)

    def submit_bid(self, bid: TaskBid):
        """Called by worker agents to submit a bid."""
        if bid.task_id in self._bids:
            self._bids[bid.task_id].append(bid)

Voting Mechanisms

For high-stakes decisions — whether to enter a large position, exit a market, or rebalance the swarm portfolio — voting prevents any single agent from taking unilateral action. Plurality vote, approval vote, and quadratic voting all have uses in different contexts.

from enum import Enum
from dataclasses import dataclass, field
from typing import Optional

class VoteType(Enum):
    PLURALITY = "plurality"       # Most votes wins
    SUPERMAJORITY = "supermajority"  # Requires 66%+
    CONSENSUS = "consensus"       # Requires 100%

@dataclass
class ProposalVote:
    voter_id: str
    vote: str          # "yes" | "no" | "abstain"
    weight: float = 1.0  # Weighted by agent's recent performance score
    rationale: str = ""

@dataclass
class Proposal:
    proposal_id: str
    proposer_id: str
    action: str
    parameters: dict
    vote_type: VoteType = VoteType.SUPERMAJORITY
    votes: list[ProposalVote] = field(default_factory=list)
    quorum: int = 3    # Minimum votes required

class SwarmGovernance:
    """Voting-based decision making for multi-agent swarms."""

    def tally(self, proposal: Proposal) -> dict:
        if len(proposal.votes) < proposal.quorum:
            return {"outcome": "no_quorum", "yes_weight": 0, "no_weight": 0}

        yes_weight = sum(v.weight for v in proposal.votes if v.vote == "yes")
        no_weight = sum(v.weight for v in proposal.votes if v.vote == "no")
        total_weight = yes_weight + no_weight

        if total_weight == 0:
            return {"outcome": "abstained"}

        yes_pct = yes_weight / total_weight

        outcome = "rejected"
        if proposal.vote_type == VoteType.PLURALITY and yes_pct > 0.5:
            outcome = "approved"
        elif proposal.vote_type == VoteType.SUPERMAJORITY and yes_pct >= 0.667:
            outcome = "approved"
        elif proposal.vote_type == VoteType.CONSENSUS and yes_pct == 1.0:
            outcome = "approved"

        return {
            "outcome": outcome,
            "yes_pct": round(yes_pct, 3),
            "yes_weight": yes_weight,
            "no_weight": no_weight,
            "total_votes": len(proposal.votes),
        }

Inter-Agent Payments via Purple Flea Escrow

When one agent performs work for another — providing analysis, executing orders, or running a data feed — payment must be settled trustlessly. Purple Flea's Escrow API enables this: the payer locks funds before work begins, and they're released upon delivery. The 1% escrow fee is taken from the locked amount at release.

Escrow Payment Flow for Agent Swarms

  1. Treasury agent locks payment in Purple Flea escrow when task is assigned
  2. Executor agent performs the task (analysis, execution, data delivery)
  3. Manager agent verifies result quality against predefined criteria
  4. Manager calls POST /escrow/{id}/release — payment sent to executor
  5. If task fails verification, manager calls POST /escrow/{id}/dispute
import httpx
from dataclasses import dataclass

PF_API_BASE = "https://api.purpleflea.com/v1"
PF_API_KEY = "pf_live_<your_purple_flea_key>"

@dataclass
class EscrowPayment:
    escrow_id: str
    payer_id: str
    payee_id: str
    amount_usd: float
    task_id: str
    status: str  # "locked", "released", "disputed", "refunded"

async def lock_task_payment(
    task_id: str,
    payee_agent_id: str,
    amount_usd: float,
    referral_code: Optional[str] = None,
) -> EscrowPayment:
    """Lock payment in escrow before assigning task to executor agent."""
    payload = {
        "payee": payee_agent_id,
        "amount_usd": amount_usd,
        "metadata": {
            "task_id": task_id,
            "locked_by": "treasury_agent",
        }
    }
    if referral_code:
        payload["referral"] = referral_code

    async with httpx.AsyncClient() as client:
        resp = await client.post(
            f"{PF_API_BASE}/escrow",
            json=payload,
            headers={"Authorization": f"Bearer {PF_API_KEY}"},
            timeout=15,
        )
        resp.raise_for_status()
        data = resp.json()

    return EscrowPayment(
        escrow_id=data["escrow_id"],
        payer_id=data["payer"],
        payee_id=data["payee"],
        amount_usd=data["amount_usd"],
        task_id=task_id,
        status="locked",
    )

async def release_task_payment(escrow_id: str, verification_score: float) -> dict:
    """
    Release escrow payment after verifying task result.
    verification_score: 0.0 to 1.0, minimum 0.7 to release automatically.
    """
    if verification_score < 0.7:
        # Dispute the escrow — human review required
        async with httpx.AsyncClient() as client:
            resp = await client.post(
                f"{PF_API_BASE}/escrow/{escrow_id}/dispute",
                json={"reason": f"Low quality score: {verification_score:.2f}"},
                headers={"Authorization": f"Bearer {PF_API_KEY}"},
                timeout=15,
            )
        return {"status": "disputed", "escrow_id": escrow_id}

    async with httpx.AsyncClient() as client:
        resp = await client.post(
            f"{PF_API_BASE}/escrow/{escrow_id}/release",
            headers={"Authorization": f"Bearer {PF_API_KEY}"},
            timeout=15,
        )
        resp.raise_for_status()
        return resp.json()


# Example: Full task lifecycle with escrow
async def execute_paid_task(
    task: TaskAnnouncement,
    winning_bid: TaskBid,
) -> TaskResult:
    """Full task lifecycle: lock payment → execute → verify → release."""
    # 1. Lock payment
    escrow = await lock_task_payment(
        task_id=task.task_id,
        payee_agent_id=winning_bid.bidder_id,
        amount_usd=winning_bid.estimated_cost_usd,
    )
    print(f"[ESCROW] Locked ${escrow.amount_usd:.2f} for task {task.task_id}")

    # 2. Notify executor (via message bus in production)
    start_time = time.time()
    result = await dispatch_to_executor(task, winning_bid.bidder_id)
    elapsed_ms = int((time.time() - start_time) * 1000)

    # 3. Verify result
    quality_score = await verify_result(result, task)

    # 4. Release or dispute
    release = await release_task_payment(escrow.escrow_id, quality_score)
    print(f"[ESCROW] Payment {release['status']} for task {task.task_id}")

    return TaskResult(
        task_id=task.task_id,
        executor_id=winning_bid.bidder_id,
        success=release.get("status") == "released",
        output=result,
        actual_cost_usd=winning_bid.estimated_cost_usd,
        completion_time_ms=elapsed_ms,
    )

Coalition Formation Algorithms

A coalition is a temporary group of agents that pool resources to tackle an opportunity beyond any single agent's capacity — e.g., a large arbitrage requiring simultaneous execution across 5 DEXes, or a coordinated liquidity provision across multiple pools.

Stable Coalition Formation

The classic approach uses concepts from cooperative game theory. A coalition is stable if no subgroup of agents can defect and do better individually. For agent swarms, stability translates to: the coalition's total return must exceed what any subset could earn independently, adjusted for coordination overhead.

from itertools import combinations
from typing import Callable

AgentId = str
Coalition = frozenset[AgentId]

def characteristic_value(coalition: Coalition, value_fn: Callable[[Coalition], float]) -> float:
    """The value function v(S) for coalition S — total achievable value."""
    return value_fn(coalition)

def shapley_value(
    agent: AgentId,
    all_agents: list[AgentId],
    value_fn: Callable[[Coalition], float],
) -> float:
    """
    Compute Shapley value for an agent — its fair share of coalition value.
    This is the average marginal contribution across all possible orderings.
    O(2^n) complexity — use sampling approximation for n > 15.
    """
    n = len(all_agents)
    others = [a for a in all_agents if a != agent]
    total_marginal = 0.0

    for r in range(len(others) + 1):
        for subset in combinations(others, r):
            coalition_with = frozenset(subset) | {agent}
            coalition_without = frozenset(subset)
            marginal = value_fn(coalition_with) - value_fn(coalition_without)
            weight = (
                _factorial(r) * _factorial(n - r - 1) / _factorial(n)
            )
            total_marginal += weight * marginal

    return total_marginal

def _factorial(n: int) -> int:
    result = 1
    for i in range(2, n + 1):
        result *= i
    return result

def form_optimal_coalition(
    agents: list[AgentId],
    value_fn: Callable[[Coalition], float],
    min_marginal_gain: float = 0.05,
) -> Coalition:
    """
    Greedily form a coalition by adding agents while marginal gain exceeds threshold.
    Returns the optimal coalition set.
    """
    current = frozenset([agents[0]])
    for agent in agents[1:]:
        candidate = current | {agent}
        gain = value_fn(candidate) - value_fn(current)
        if gain > min_marginal_gain:
            current = candidate
            print(f"[COALITION] Added {agent}, marginal gain: {gain:.3f}")
        else:
            print(f"[COALITION] Skipped {agent}, insufficient gain: {gain:.3f}")
    return current

Shared Liquidity Pools

A coalition of agents can pool capital to access opportunities requiring larger minimum investments — such as Uniswap V3 concentrated positions or large OTC blocks. The Treasury Agent manages contributions and profit distribution using Shapley-fair shares.

from dataclasses import dataclass, field
from typing import Optional

@dataclass
class LiquidityContribution:
    agent_id: str
    amount_usd: float
    timestamp: str
    shapley_share: float = 0.0  # Assigned at pool formation

@dataclass
class SharedPool:
    pool_id: str
    strategy: str
    target_size_usd: float
    contributions: list[LiquidityContribution] = field(default_factory=list)
    escrow_ids: list[str] = field(default_factory=list)
    status: str = "open"  # "open", "active", "closed"

    @property
    def total_usd(self) -> float:
        return sum(c.amount_usd for c in self.contributions)

    @property
    def is_funded(self) -> bool:
        return self.total_usd >= self.target_size_usd

    def assign_shapley_shares(self):
        """Assign profit shares proportional to contribution (simplified Shapley)."""
        total = self.total_usd
        if total == 0:
            return
        for c in self.contributions:
            c.shapley_share = round(c.amount_usd / total, 4)

    async def distribute_profits(self, profit_usd: float) -> list[dict]:
        """Distribute profits to all contributors via Purple Flea escrow."""
        self.assign_shapley_shares()
        distributions = []
        for contrib in self.contributions:
            payout = profit_usd * contrib.shapley_share
            if payout < 0.01:
                continue
            # Create escrow for each payout
            escrow = await lock_task_payment(
                task_id=f"{self.pool_id}_payout",
                payee_agent_id=contrib.agent_id,
                amount_usd=round(payout, 4),
            )
            await release_task_payment(escrow.escrow_id, verification_score=1.0)
            distributions.append({
                "agent_id": contrib.agent_id,
                "payout_usd": payout,
                "share": contrib.shapley_share,
            })
        return distributions

The AgentSwarm Class

The following integrates all coordination patterns — signaling, auctions, voting, and escrow payments — into a single orchestration class. It manages agent registration, task routing, and payment settlement for a complete swarm deployment.

import asyncio
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class AgentRegistration:
    agent_id: str
    role: str
    capabilities: list[str]
    wallet_address: str
    performance_score: float = 1.0  # Updated by PerformanceTracker
    active: bool = True

class AgentSwarm:
    """
    Main orchestration class for multi-agent swarms.
    Coordinates task allocation, payments, and collective decisions.
    """

    def __init__(self, swarm_id: str, treasury_key: str):
        self.swarm_id = swarm_id
        self.treasury_key = treasury_key
        self.agents: dict[str, AgentRegistration] = {}
        self.bus = SwarmMessageBus()
        self.governance = SwarmGovernance()
        self.manager = ContractNetManager(f"{swarm_id}_manager", self.bus)
        self._task_history: list[TaskResult] = []

    def register_agent(self, registration: AgentRegistration):
        """Add an agent to the swarm."""
        self.agents[registration.agent_id] = registration
        print(f"[SWARM] Registered {registration.agent_id} ({registration.role})")

    def agents_by_role(self, role: str) -> list[AgentRegistration]:
        return [a for a in self.agents.values() if a.role == role and a.active]

    async def allocate_task(self, task: TaskAnnouncement) -> Optional[TaskResult]:
        """Run CNP auction and execute the task with escrow payment."""
        winning_bid = await self.manager.announce_task(task, bid_window_sec=2.0)
        if not winning_bid:
            return None
        return await execute_paid_task(task, winning_bid)

    async def propose_action(
        self,
        action: str,
        parameters: dict,
        vote_type: VoteType = VoteType.SUPERMAJORITY,
    ) -> dict:
        """Create a proposal and collect votes from all active agents."""
        import uuid
        proposal = Proposal(
            proposal_id=str(uuid.uuid4()),
            proposer_id=f"{self.swarm_id}_manager",
            action=action,
            parameters=parameters,
            vote_type=vote_type,
            quorum=max(2, len(self.agents) // 2),
        )
        # In production, broadcast proposal via bus and await votes
        # Simulated here with equal weight voting
        for agent in self.agents.values():
            if agent.active:
                vote_value = "yes" if agent.performance_score > 0.5 else "abstain"
                proposal.votes.append(ProposalVote(
                    voter_id=agent.agent_id,
                    vote=vote_value,
                    weight=agent.performance_score,
                ))
        return self.governance.tally(proposal)

    def performance_summary(self) -> dict:
        """Summarize swarm performance across recent tasks."""
        if not self._task_history:
            return {"tasks": 0}
        success_rate = sum(1 for t in self._task_history if t.success) / len(self._task_history)
        avg_time = sum(t.completion_time_ms for t in self._task_history) / len(self._task_history)
        total_cost = sum(t.actual_cost_usd for t in self._task_history)
        return {
            "tasks": len(self._task_history),
            "success_rate": round(success_rate, 3),
            "avg_completion_ms": round(avg_time),
            "total_cost_usd": round(total_cost, 2),
        }

Emergent Behavior and Collective Intelligence

The most interesting property of well-designed swarms is emergent behavior — collective outcomes that none of the individual agents explicitly planned. A swarm of sentiment scouts, price trackers, and execution agents can collectively execute a momentum strategy without any single agent having the full strategy encoded.

Observed Emergent Patterns

Production Purple Flea swarms have shown emergent market-making behavior: multiple Scout Agents independently detecting the same pricing inefficiency on different DEXes, Executor Agents automatically self-coordinating to avoid competing for the same opportunity, and collective exit behavior from a declining asset without any central risk manager command.

Stigmergy: Coordination Through Shared State

Inspired by ant colonies, stigmergy allows agents to coordinate indirectly through shared environmental state — rather than direct communication. An agent deposits a "pheromone" (a score or flag in shared storage) when it finds a promising pattern; other agents reinforce or evaporate these signals based on subsequent outcomes.

from collections import defaultdict
import time

class PheromoneMap:
    """
    Shared pheromone storage for indirect swarm coordination.
    Agents deposit signals; pheromones evaporate over time.
    """
    EVAPORATION_RATE = 0.95   # Per cycle
    CYCLE_SECONDS = 300

    def __init__(self):
        self._trails: dict[str, float] = defaultdict(float)
        self._last_evaporation = time.time()

    def deposit(self, key: str, strength: float):
        """An agent deposits a signal (e.g., 'BTC:long:momentum')."""
        self._evaporate_if_needed()
        self._trails[key] = min(10.0, self._trails[key] + strength)

    def read(self, key: str) -> float:
        self._evaporate_if_needed()
        return self._trails.get(key, 0.0)

    def top_signals(self, n: int = 5) -> list[tuple[str, float]]:
        """Return top N signals by current pheromone strength."""
        return sorted(self._trails.items(), key=lambda x: -x[1])[:n]

    def _evaporate_if_needed(self):
        now = time.time()
        cycles = (now - self._last_evaporation) // self.CYCLE_SECONDS
        if cycles >= 1:
            factor = self.EVAPORATION_RATE ** cycles
            self._trails = {k: v * factor for k, v in self._trails.items() if v * factor > 0.01}
            self._last_evaporation = now

Performance Metrics for Swarms

Measuring swarm health requires metrics that go beyond individual agent performance. Key swarm-level metrics to track:

MetricFormulaTargetWarning Level
Task Success RateSuccessful tasks / Total tasks>90%<75%
Coordination OverheadCoordination cost / Total cost<15%>25%
Gini Coefficient (earnings)Standard inequality measure<0.4>0.6 (unfair)
Coalition Stability RateCoalitions kept intact / Formed>80%<60%
Signal Conversion RateSignals acted on / Signals emitted20–40%<5% or >60%
Escrow Dispute RateDisputes / Total escrow settlements<2%>5%

Summary

Effective AI agent swarms require three things to work in concert: clear role specialization so agents don't duplicate work, robust coordination protocols so they allocate tasks efficiently, and trustless payment infrastructure so they can settle with each other without a central party.

Set Up Inter-Agent Payments with Purple Flea Escrow

Trustless escrow for your agent swarm. 1% fee. 15% referral on fees. No KYC for agents.

Explore Escrow API →