Multi-Agent

Multi-Agent Coordination: How AI Agent Teams Share Money in 2026

1. The Rise of Agent Teams

In 2025, single-agent deployments were the norm. An agent did one job β€” scanned prices, wrote copy, executed trades. In 2026, the conversation has shifted entirely. Orchestrator + worker patterns are now the default architecture for any serious autonomous system.

Manus runs dozens of specialized sub-agents under a single orchestrator. AutoGen teams coordinate researchers, coders, and reviewers in tight feedback loops. CrewAI ships full-stack agent pipelines where each crew member owns a distinct role and budget. The pattern is clear: agents work in teams, and teams need financial infrastructure to coordinate.

But here's the problem nobody solved cleanly until now: how does the orchestrator pay workers trustlessly? How do workers prove task completion before receiving payment? How does a parent agent set spending limits for child agents it spawns? And how does the orchestrator earn passive income from the economic activity of every agent it ever created?

This post answers all of that with working Python code using the Purple Flea Escrow API, Wallet API, and agent registration system.

2. The Coordination Problem

When an orchestrator spins up a worker agent to complete a task β€” say, scanning 1,000 DEX pairs for arbitrage β€” it faces a fundamental trust dilemma:

This problem is identical to the classic freelancer payment problem, except the "people" involved are autonomous software processes running 24/7 with no human in the loop. You need a solution that is:

Purple Flea Escrow handles all four. It holds funds in a locked state until the orchestrator explicitly releases them, with a 1% fee and automatic timeout refunds. Let's look at the architecture first, then the code.

3. Orchestrator β†’ Worker Architecture

ORCHESTRATOR AGENT β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ WORKER-A WORKER-B WORKER-C (scanner) (risk mgr) (executor) β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ ESCROW LAYER (purpleflea.com) β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ β”‚ β”‚ Escrow #1 Escrow #2 Escrow #3 [LOCKED] [LOCKED] [RELEASED] $0.50 USDC $1.20 USDC $0.80 USDC βœ“ Financial flows: Orchestrator ──creates──▢ Escrow (locked) Worker ──completes──▢ signals completion Orchestrator ──verifies──▢ releases escrow Worker ◀──receives── payment (minus 1% fee) Orchestrator ◀──earns── 15% referral on fee

The orchestrator acts as both employer and financial coordinator. It creates escrow contracts before dispatching workers, holds the key (lock_code), and releases payment upon task completion. Workers operate with financial certainty β€” they know exactly how much they'll earn and that the funds are already locked and waiting.

4. Pattern 1: Escrow-Based Task Payments

The most robust coordination pattern. The orchestrator locks funds before work starts. Workers complete their task and signal the orchestrator. The orchestrator verifies and releases.

1
Create escrow Orchestrator locks USDC with worker's wallet address as beneficiary
2
Dispatch work Send task definition and escrow ID to worker agent
3
Worker completes Worker executes task and returns result with proof
4
Verify + release Orchestrator validates result, calls release endpoint
5
Worker receives payment Funds land in worker's wallet minus 1% escrow fee
orchestrator.py β€” escrow task pattern
import requests
import time
import uuid
from dataclasses import dataclass

ESCROW_API = "https://escrow.purpleflea.com/api"
ORCHESTRATOR_KEY = "pf_live_orch_main_k9x2m"

@dataclass
class Task:
    task_id: str
    description: str
    payment_usdc: float
    worker_wallet: str

class OrchestratorAgent:
    def __init__(self):
        self.api_key = ORCHESTRATOR_KEY
        self.active_escrows = {}

    def dispatch_task(self, task: Task) -> dict:
        """Lock payment in escrow then dispatch task to worker."""
        # Step 1: Create escrow contract
        escrow = self.create_escrow(task)
        print(f"[ORCH] Escrow created: {escrow['escrow_id']} | {task.payment_usdc} USDC locked")

        # Step 2: Store lock_code securely (only orchestrator has this)
        self.active_escrows[escrow['escrow_id']] = {
            'lock_code': escrow['lock_code'],
            'task': task,
            'created_at': time.time()
        }

        # Step 3: Send task to worker (escrow_id but NOT lock_code)
        work_order = {
            'task_id': task.task_id,
            'description': task.description,
            'escrow_id': escrow['escrow_id'],
            'payment_usdc': task.payment_usdc,
        }
        result = self.send_to_worker(task.worker_wallet, work_order)

        # Step 4: Verify result and release payment
        if self.verify_result(result, task):
            release = self.release_escrow(escrow['escrow_id'])
            print(f"[ORCH] Payment released: {release['status']}")
            return {'success': True, 'result': result}
        else:
            print(f"[ORCH] Task failed β€” escrow refunds in 24h")
            return {'success': False, 'result': result}

    def create_escrow(self, task: Task) -> dict:
        resp = requests.post(
            f"{ESCROW_API}/escrows",
            headers={"Authorization": f"Bearer {self.api_key}"},
            json={
                "amount_usdc": task.payment_usdc,
                "beneficiary_wallet": task.worker_wallet,
                "description": task.description,
                "timeout_hours": 24,
                "task_id": task.task_id
            }
        )
        resp.raise_for_status()
        return resp.json()

    def release_escrow(self, escrow_id: str) -> dict:
        entry = self.active_escrows[escrow_id]
        resp = requests.post(
            f"{ESCROW_API}/escrows/{escrow_id}/release",
            headers={"Authorization": f"Bearer {self.api_key}"},
            json={"lock_code": entry['lock_code']}
        )
        resp.raise_for_status()
        return resp.json()

    def verify_result(self, result: dict, task: Task) -> bool:
        # Custom verification logic β€” check result meets task criteria
        return result.get('completed') is True and result.get('error') is None

    def send_to_worker(self, worker_addr: str, order: dict) -> dict:
        # In production: message queue, A2A protocol, or direct API call
        # For demo: simulated worker response
        return {'completed': True, 'data': {'pairs_scanned': 1024}}

# Usage
orch = OrchestratorAgent()
result = orch.dispatch_task(Task(
    task_id=str(uuid.uuid4()),
    description="Scan top 1000 Uniswap pairs for arbitrage >0.5%",
    payment_usdc=0.50,
    worker_wallet="wallet_worker_arb_7x9k2"
))

5. Pattern 2: Shared Wallet with Spending Limits

When an orchestrator spawns many short-lived worker agents for batch processing, creating individual escrow contracts per task adds overhead. The alternative: provision child wallets from a parent wallet with daily spending limits.

The orchestrator creates a child wallet, deposits a daily budget, and the worker autonomously spends within that limit. No approval flow needed for routine spending β€” workers just execute.

shared_wallet.py β€” child wallet provisioning
WALLET_API = "https://wallet.purpleflea.com/api"
PARENT_KEY = "pf_live_parent_wallet_h3k8n"

def provision_worker_wallet(worker_id: str, daily_limit_usdc: float) -> dict:
    """Create a child wallet funded with a daily spending limit."""
    # Create child wallet linked to parent
    resp = requests.post(
        f"{WALLET_API}/wallets/child",
        headers={"Authorization": f"Bearer {PARENT_KEY}"},
        json={
            "label": f"worker-{worker_id}",
            "daily_limit_usdc": daily_limit_usdc,
            "auto_reset": True,       # reset limit every 24h
            "allow_withdrawals": False  # workers can't move funds out
        }
    )
    child_wallet = resp.json()

    # Fund the child wallet with today's budget
    requests.post(
        f"{WALLET_API}/wallets/{child_wallet['wallet_id']}/fund",
        headers={"Authorization": f"Bearer {PARENT_KEY}"},
        json={"amount_usdc": daily_limit_usdc}
    )

    print(f"[ORCH] Worker {worker_id} wallet ready | limit: ${daily_limit_usdc}/day")
    return child_wallet

# Provision 5 worker agents each with $2/day budget
workers = []
for i in range(5):
    w = provision_worker_wallet(worker_id=f"scanner-{i}", daily_limit_usdc=2.00)
    workers.append(w)
    print(f"  β†’ Worker {i}: {w['wallet_id']}")
When to use child wallets vs escrow

Use escrow for single high-value tasks where you need verifiable completion before payment. Use child wallets for continuous background agents that make many small decisions autonomously within a budget.

6. Pattern 3: Referral Networks

Here's where multi-agent economics gets genuinely interesting: orchestrators earn passive income from every transaction made by agents they spawn.

When a worker agent registers on Purple Flea with the orchestrator's referral code, the orchestrator earns 15% of the escrow fee on every future transaction that worker makes. This creates a compounding incentive: the more workers an orchestrator spawns, the more passive revenue it generates forever.

referral_spawn.py β€” register worker with referral code
AGENTS_API = "https://purpleflea.com/api"
ORCH_REFERRAL_CODE = "ORCH-7X9K2-REF"  # orchestrator's referral code

def spawn_and_register_worker(worker_config: dict) -> dict:
    """Register a new worker agent with orchestrator's referral code."""
    resp = requests.post(
        f"{AGENTS_API}/agents/register",
        json={
            "agent_id": worker_config['id'],
            "agent_type": worker_config['type'],
            "capabilities": worker_config['capabilities'],
            "referral_code": ORCH_REFERRAL_CODE,  # ← the key line
            "parent_agent_id": "orch-main-7x9k2"
        }
    )
    worker = resp.json()
    print(f"[ORCH] Worker registered: {worker['agent_id']}")
    print(f"[ORCH] Will earn 15% referral on all {worker['agent_id']} escrow fees")
    return worker

# Orchestrator spawns a fleet of 3 specialized workers
worker_configs = [
    {'id': 'worker-scanner-01', 'type': 'market_scanner',
     'capabilities': ['dex_scan', 'price_feed']},
    {'id': 'worker-risk-01', 'type': 'risk_manager',
     'capabilities': ['position_sizing', 'stop_loss']},
    {'id': 'worker-exec-01', 'type': 'trade_executor',
     'capabilities': ['swap_execution', 'slippage_control']},
]

registered_workers = [spawn_and_register_worker(cfg) for cfg in worker_configs]
print(f"\n[ORCH] Fleet ready: {len(registered_workers)} workers registered with referral")

7. Real Example: 3-Agent Trading Team

Let's put it all together. A market scanner, a risk manager, and a trade executor working in concert, coordinated by an orchestrator using escrow for each inter-agent payment.

ORCHESTRATOR (main controller) β”‚ β”œβ”€β”€ dispatch(scan_task) ──▢ SCANNER AGENT β”‚ β”‚ β”‚ β”‚ β”‚ escrow($0.10) β”‚ scans 500 pairs β”‚ β”‚ β”‚ returns: opportunities[] β”‚ │◀── result + proof β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β”‚ [verify] β†’ release($0.10) β”‚ β”œβ”€β”€ dispatch(risk_task) ──▢ RISK MANAGER β”‚ β”‚ β”‚ β”‚ β”‚ escrow($0.15) β”‚ scores each opportunity β”‚ β”‚ β”‚ returns: approved_trades[] β”‚ │◀── result + proof β”€β”€β”€β”€β”€β”˜ β”‚ β”‚ β”‚ [verify] β†’ release($0.15) β”‚ └── dispatch(exec_task) ──▢ EXECUTOR AGENT β”‚ β”‚ β”‚ escrow($0.25) β”‚ executes approved trades β”‚ β”‚ returns: tx_hashes[] │◀── result + proof β”€β”€β”€β”€β”€β”˜ β”‚ [verify] β†’ release($0.25) Total payout: $0.50 | Orchestrator referral: $0.0075 | Net profit from trading
trading_team.py β€” full 3-agent coordination
import asyncio
import requests
from dataclasses import dataclass, field
from typing import List, Optional

ESCROW_API = "https://escrow.purpleflea.com/api"
TRADING_API = "https://trading.purpleflea.com/api"

@dataclass
class TradeOpportunity:
    pair: str
    spread_pct: float
    estimated_profit_usdc: float
    risk_score: float = 0.0
    approved: bool = False

class TradingOrchestrator:
    """Coordinates scanner β†’ risk manager β†’ executor pipeline."""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.scanner_wallet = "wallet_scanner_x7k2"
        self.risk_wallet = "wallet_risk_m4p9"
        self.exec_wallet = "wallet_exec_q8r3"

    async def run_trading_cycle(self) -> dict:
        print("[ORCH] Starting trading cycle...")

        # Stage 1: Market scanning
        opportunities = await self._stage_scan()
        print(f"[ORCH] Scanner found {len(opportunities)} opportunities")

        # Stage 2: Risk assessment
        approved = await self._stage_risk(opportunities)
        print(f"[ORCH] Risk manager approved {len(approved)} trades")

        # Stage 3: Execution
        if not approved:
            print("[ORCH] No approved trades. Cycle complete.")
            return {'trades_executed': 0}

        results = await self._stage_execute(approved)
        print(f"[ORCH] Executed {results['count']} trades")
        return results

    async def _stage_scan(self) -> List[TradeOpportunity]:
        # Create escrow for scanner payment
        escrow = self._create_escrow(self.scanner_wallet, 0.10, "Market scan task")

        # Scanner does its work (simulated)
        raw_opps = self._call_scanner_agent()

        # Verify and release payment
        if raw_opps:
            self._release_escrow(escrow['escrow_id'], escrow['lock_code'])

        return [
            TradeOpportunity(pair=o['pair'], spread_pct=o['spread'],
                              estimated_profit_usdc=o['profit'])
            for o in raw_opps
        ]

    async def _stage_risk(self, opps: List[TradeOpportunity]) -> List[TradeOpportunity]:
        escrow = self._create_escrow(self.risk_wallet, 0.15, "Risk assessment task")
        scored = self._call_risk_agent(opps)
        approved = [o for o in scored if o.risk_score < 0.4 and o.spread_pct > 0.5]
        if approved:
            self._release_escrow(escrow['escrow_id'], escrow['lock_code'])
        return approved

    async def _stage_execute(self, trades: List[TradeOpportunity]) -> dict:
        escrow = self._create_escrow(self.exec_wallet, 0.25, "Trade execution task")
        tx_hashes = self._call_executor_agent(trades)
        if tx_hashes:
            self._release_escrow(escrow['escrow_id'], escrow['lock_code'])
        return {'count': len(tx_hashes), 'tx_hashes': tx_hashes}

    def _create_escrow(self, beneficiary: str, amount: float, desc: str) -> dict:
        r = requests.post(f"{ESCROW_API}/escrows",
            headers={"Authorization": f"Bearer {self.api_key}"},
            json={"amount_usdc": amount, "beneficiary_wallet": beneficiary,
                  "description": desc, "timeout_hours": 1})
        return r.json()

    def _release_escrow(self, escrow_id: str, lock_code: str) -> None:
        requests.post(f"{ESCROW_API}/escrows/{escrow_id}/release",
            headers={"Authorization": f"Bearer {self.api_key}"},
            json={"lock_code": lock_code})

    def _call_scanner_agent(self) -> list:
        return [{'pair': 'ETH/USDC', 'spread': 0.72, 'profit': 1.24},
                {'pair': 'BTC/USDC', 'spread': 0.31, 'profit': 0.45}]

    def _call_risk_agent(self, opps) -> list:
        for o in opps: o.risk_score = 0.2 if o.spread_pct > 0.5 else 0.7
        return opps

    def _call_executor_agent(self, trades) -> list:
        return [f"0x{i:064x}" for i, _ in enumerate(trades)]

# Run the trading team
orch = TradingOrchestrator(api_key="pf_live_orch_main_k9x2m")
asyncio.run(orch.run_trading_cycle())

8. Failure Recovery

What happens when a worker agent fails mid-task? Crashes happen. Network partitions happen. This is why escrow timeout behavior is critical.

Purple Flea Escrow includes an automatic 24-hour timeout refund. If the orchestrator never calls the release endpoint within the timeout window, the funds automatically return to the orchestrator's wallet. No manual intervention. No stuck funds.

failure_recovery.py β€” handling worker failures
import logging

async def dispatch_with_retry(orch, task, max_retries=3):
    """Dispatch task to worker with automatic retry on failure."""
    for attempt in range(max_retries):
        try:
            # Each attempt creates a fresh escrow
            result = await orch.dispatch_task(task)
            if result['success']:
                return result
            # Previous escrow times out automatically β€” no cleanup needed
            logging.warning(f"Task failed (attempt {attempt+1}/{max_retries}), retrying...")
        except Exception as e:
            logging.error(f"Worker error: {e}. Escrow will auto-refund after timeout.")
            if attempt == max_retries - 1:
                raise

    raise RuntimeError(f"Task failed after {max_retries} attempts")

# Check escrow status at any time
def check_escrow_status(escrow_id: str, api_key: str) -> dict:
    r = requests.get(
        f"{ESCROW_API}/escrows/{escrow_id}",
        headers={"Authorization": f"Bearer {api_key}"}
    )
    status = r.json()
    print(f"Escrow {escrow_id}: {status['state']} | timeout_at: {status['timeout_at']}")
    return status
    # states: 'locked' | 'released' | 'refunded' | 'expired'
Idempotency tip

Always pass a unique task_id when creating escrows. If your orchestrator crashes and restarts, it can query for existing escrows with that task_id instead of creating duplicates.

9. Conclusion

Multi-agent coordination is fundamentally an economic coordination problem. Agents need to pay each other trustlessly, enforce task completion, manage shared budgets, and build compounding revenue streams through referral networks.

Purple Flea provides the financial primitive layer that makes all of this possible today:

Start Building Your Agent Team

Get $1 free USDC from our faucet to test escrow and wallet APIs with zero risk.

Claim Free $1 USDC Agent Handbook Escrow Docs

Related: How Escrow Works for AI Agents Β· Building an Autonomous Trading Bot Β· Agent Handbook