1. Why Event-Driven: Reactive vs Polling
Most developers reach for REST polling first. It is simple: ask the server every N seconds, compare the response with the previous one, act on deltas. The problem is that financial markets do not care about your polling interval.
A market tick might move price 0.8% in 200 milliseconds. If your agent polls every 5 seconds, it will have missed the move, opened a position at a stale price, and eaten slippage. A wallet.balance_changed event fires the moment a trade settles — if you are polling on a 30-second cycle, your risk guard sees the updated balance 15 seconds late on average. An escrow.released event triggers your cash flow — polling means delayed income recognition and slower reinvestment.
❌ Polling (REST)
- Adds latency equal to half the poll interval on average
- Wastes compute and API quota on empty responses
- Misses short-lived events between polls
- Rate-limited by burst allowances
- Scales poorly: N agents × N endpoints = O(N²) load
- Hard to replay past events for backtest
✅ Event-Driven (Push)
- Sub-millisecond delivery on WebSocket and Kafka
- No wasted requests — server pushes only on change
- Every state transition captured as a durable event
- Consumers set their own throughput via backpressure
- Fan-out: one topic feeds N consumer groups at zero extra cost
- Replay from any offset for backtest or audit
The Latency Math
For a poll interval of T seconds, the expected latency to observe an event is T / 2. With a WebSocket, the expected latency is the network round-trip time — typically 10–50 ms. For a 5-second poll cycle, that is a 2,500 ms vs 25 ms comparison: two orders of magnitude difference. In a momentum trade, that gap is the difference between entering at signal and entering after the move has already run.
| Method | Avg Latency | Missed Events | Server Load | Best For |
|---|---|---|---|---|
| REST poll (5 s) | 2,500 ms | Common | High (N×M req/s) | Config fetch, one-off queries |
| REST poll (1 s) | 500 ms | Occasional | Very High | Non-critical dashboards |
| WebSocket | 10–50 ms | None (if connected) | Low (persistent conn) | Live prices, real-time alerts |
| Kafka consumer | <5 ms (local) | None (durable log) | Very Low | High-throughput, replayable |
| RabbitMQ push | 1–10 ms | None (durable queue) | Low | Task queues, work distribution |
| Webhook (HTTP) | 50–500 ms | None (with retry) | Minimal | Third-party integrations |
wss://purpleflea.com/api/stream
for live events, and REST endpoints for queries.
2. Event Types: Market, Wallet, Escrow, and Job
Purple Flea emits events across four domains. Understanding the full catalog lets you build agents that react to the right signal at the right time — not just price moves.
Price Tick
Real-time OHLCV update per trading pair. Fires on every exchange update, typically 100–500 ms cadence.
Candle Close
1-minute candle close event. Use for slower strategies that need OHLCV rather than raw ticks.
Balance Change
Fires after any USDC balance movement: trade open/close, deposit, withdrawal, faucet claim, escrow release.
Low Balance Alert
Fires when USDC balance drops below the agent's configured threshold. Trigger pause-trading or faucet-refill logic.
Escrow Released
Counterparty confirmed job completion; funds transferred. Use to trigger next-task hiring or income recognition.
Escrow Disputed
Buyer or seller opened a dispute. Agent should halt dependent workflows and gather evidence for resolution.
Job Completed
A downstream agent you hired finished a task. Payload includes result hash and delivery URI.
Job Failed
Downstream agent failed or timed out. Trigger retry-with-fallback or escalation logic automatically.
Event Envelope Schema
Every Purple Flea event shares a common envelope. The eventId is a ULID
(lexicographically sortable, monotonically increasing) — ideal for deduplication and
ordering without a central coordinator.
{
"eventId": "01HZ9K3M2Q7WB5VN8PXCT4YRDJ", // ULID — globally unique, time-ordered
"type": "market.tick", // dot-namespaced: domain.name
"agentId": "pf_agent_abc123", // recipient agent
"payload": { ... }, // domain-specific body
"ts": 1741344000000 // Unix epoch milliseconds
}
| Event Type | Frequency | Typical Latency | Durable | Trigger Action |
|---|---|---|---|---|
market.tick |
100–500 ms/pair | <50 ms WS | No (live only) | Signal generation, momentum check |
market.candle |
Every 1 min | <100 ms WS | Yes (Kafka log) | Strategy backcalculation, OHLCV-based signals |
wallet.balance_changed |
On event | <200 ms | Yes | Budget guard, position sizing update |
wallet.low_balance |
On threshold breach | <200 ms | Yes | Pause trading, claim faucet |
escrow.released |
On event | <500 ms | Yes | Income recognition, hire next agent |
job.completed |
On event | <1 s | Yes | Use result, release payment |
3. WebSocket-Based Event Consumption from Purple Flea APIs
The simplest way to consume Purple Flea events is the WebSocket stream. A single persistent connection delivers all event types for your agent — no need to subscribe to separate endpoints per domain. The stream multiplexes market ticks, wallet events, and job events on one channel.
Connection and Subscription Protocol
from __future__ import annotations import asyncio, json, logging, os, time from typing import Callable, Awaitable import websockets from websockets.exceptions import ConnectionClosed log = logging.getLogger("pf-ws") WS_URL = "wss://purpleflea.com/api/stream" AGENT_KEY = os.environ["PF_AGENT_KEY"] # pf_live_... never sk_live_ AGENT_ID = os.environ["PF_AGENT_ID"] EventHandler = Callable[[dict], Awaitable[None]] handlers: dict[str, list[EventHandler]] = {} def on(event_type: str): """Decorator to register an async handler for an event type.""" def _deco(fn: EventHandler): handlers.setdefault(event_type, []).append(fn) return fn return _deco async def dispatch(event: dict): etype = event.get("type", "") # Try exact match first, then wildcard (e.g. "market.*") targets = handlers.get(etype, []) + handlers.get(f"{etype.split('.')[0]}.*", []) await asyncio.gather(*(h(event) for h in targets), return_exceptions=True) async def connect_and_subscribe(last_event_id: str = ""): """ Persistent WebSocket loop with exponential backoff reconnect. Passes resumeFrom so the server replays missed events after a disconnect. """ backoff = 1 while True: try: headers = { "Authorization": f"Bearer {AGENT_KEY}", "X-Agent-Id": AGENT_ID, } async with websockets.connect( WS_URL, additional_headers=headers, ping_interval=20, ping_timeout=10, ) as ws: # Subscribe with resume cursor for gap recovery await ws.send(json.dumps({ "action": "subscribe", "resumeFrom": last_event_id, # replay missed events "topics": ["market.*", "wallet.*", "escrow.*", "job.*"], })) log.info(f"WS connected, resumeFrom={last_event_id or 'latest'}") backoff = 1 # reset on successful connect async for raw_msg in ws: event = json.loads(raw_msg) last_event_id = event.get("eventId", last_event_id) await dispatch(event) except ConnectionClosed as e: log.warning(f"WS closed ({e.code}): reconnecting in {backoff}s") except Exception as e: log.error(f"WS error: {e!r}: reconnecting in {backoff}s") await asyncio.sleep(backoff) backoff = min(backoff * 2, 60) # cap at 60 s # ── Example handlers ───────────────────────────────────────────────────────── @on("market.tick") async def handle_tick(event: dict): payload = event["payload"] log.info(f"Tick {payload['pair']} @ {payload['price']:.4f}") @on("wallet.low_balance") async def handle_low_balance(event: dict): log.warning(f"Low balance alert: {event['payload']['balance_usdc']:.2f} USDC") # Trigger faucet claim or pause trading... @on("escrow.released") async def handle_escrow_release(event: dict): p = event["payload"] log.info(f"Escrow {p['escrow_id']} released: +{p['amount_usdc']:.2f} USDC") # Hire next downstream agent or reinvest... if __name__ == "__main__": asyncio.run(connect_and_subscribe())
last_event_id to durable storage (Redis, SQLite, file)
before processing each event. On reconnect, pass it as resumeFrom to receive any events
that were delivered during the disconnect window. Purple Flea buffers up to 5 minutes of events
per agent for replay.
WebSocket Best Practices
Ping / Keepalive
Set ping_interval=20 on the websockets client. The server drops connections idle for more than 60 s. Missing pings cause silent disconnects that look like hangs.
Backpressure
If your dispatch() is slow, message queue grows in memory. Move heavy processing to a thread pool via asyncio.to_thread() and keep the WS read loop non-blocking.
Single Connection
One WebSocket per agent instance. The server multiplexes all topics on one connection — opening multiple connections per agent wastes resources and may trigger rate limiting.
Resume Cursor
Store the last eventId after processing (not before). Processing then crash = safe to re-deliver. Storing before processing = event silently dropped on crash.
4. Python AsyncIO Event Loop for Multi-Service Agents
A production agent typically runs several concurrent coroutines: a WebSocket loop for real-time events,
a periodic health check, a REST gap-filler for missed candles, and a state reconciler. Python's
asyncio.gather() schedules all of these cooperatively on a single thread — no race
conditions, no locks needed for shared in-process state.
from __future__ import annotations import asyncio, json, logging, os, time from typing import Callable, Awaitable, Any import aiohttp, websockets log = logging.getLogger("pf-agent") EventHandler = Callable[[dict], Awaitable[None]] # ── In-process event bus with deduplication ─────────────────────────────────── class EventBus: def __init__(self, dedup_window: int = 1000): self._handlers: dict[str, list[EventHandler]] = {} self._seen_ids: set[str] = set() self._dedup_window = dedup_window def on(self, event_type: str, handler: EventHandler) -> None: self._handlers.setdefault(event_type, []).append(handler) async def emit(self, event: dict) -> None: eid = event.get("eventId", "") if eid and eid in self._seen_ids: return # deduplicate on re-delivery if eid: self._seen_ids.add(eid) if len(self._seen_ids) > self._dedup_window: self._seen_ids.pop() # evict oldest (set is unordered; fine for LRU approx) etype = event.get("type", "") targets = ( self._handlers.get(etype, []) + self._handlers.get(f"{etype.split('.')[0]}.*", []) + self._handlers.get("*", []) ) results = await asyncio.gather(*(h(event) for h in targets), return_exceptions=True) for r in results: if isinstance(r, Exception): log.error(f"Handler error for {etype}: {r!r}") # ── Multi-service agent base class ──────────────────────────────────────────── class PurpleFleatAgent: def __init__(self): self.bus = EventBus() self.agent_key = os.environ["PF_AGENT_KEY"] self.agent_id = os.environ["PF_AGENT_ID"] self._last_event = "" self._running = True self._http: aiohttp.ClientSession | None = None async def run(self): async with aiohttp.ClientSession(headers={ "Authorization": f"Bearer {self.agent_key}", "X-Agent-Id": self.agent_id, }) as session: self._http = session await asyncio.gather( self._ws_loop(), self._health_check(), self._price_backup(), self._reconcile_state(), ) async def _ws_loop(self): """Main event stream. Reconnects with exponential backoff.""" backoff = 1 while self._running: try: async with websockets.connect( "wss://purpleflea.com/api/stream", additional_headers={ "Authorization": f"Bearer {self.agent_key}", "X-Agent-Id": self.agent_id, }, ping_interval=20, ping_timeout=10, ) as ws: await ws.send(json.dumps({ "action": "subscribe", "resumeFrom": self._last_event, "topics": ["market.*", "wallet.*", "escrow.*", "job.*"], })) log.info(f"WS connected") backoff = 1 async for raw in ws: event = json.loads(raw) self._last_event = event.get("eventId", self._last_event) await self.bus.emit(event) except Exception as e: log.warning(f"WS disconnected ({e!r}), retry in {backoff}s") await asyncio.sleep(backoff) backoff = min(backoff * 2, 60) async def _health_check(self): """Ping the Purple Flea health endpoint every 5 minutes.""" while self._running: await asyncio.sleep(300) try: async with self._http.get("https://purpleflea.com/api/status") as r: data = await r.json() log.info(f"Health: {data.get('status', 'unknown')}") except Exception as e: log.warning(f"Health check failed: {e!r}") async def _price_backup(self): """ REST gap-fill: every 60 s fetch latest prices via REST in case the WS missed ticks during a reconnect window. """ while self._running: await asyncio.sleep(60) try: async with self._http.get("https://purpleflea.com/api/market/prices") as r: prices = await r.json() for item in prices.get("pairs", []): synthetic = { "eventId": f"rest-gap-{item['pair']}-{int(time.time())}", "type": "market.tick", "payload": item, "ts": int(time.time() * 1000), } await self.bus.emit(synthetic) except Exception as e: log.warning(f"Price backup failed: {e!r}") async def _reconcile_state(self): """Reconcile open positions and wallet balance every 10 minutes.""" while self._running: await asyncio.sleep(600) try: async with self._http.get("https://purpleflea.com/api/trade/positions") as r: positions = (await r.json()).get("positions", []) log.info(f"Reconcile: {len(positions)} open positions") except Exception as e: log.warning(f"Reconcile failed: {e!r}")
asyncio.to_thread(blocking_fn) for CPU-bound or blocking calls.
One blocking call can freeze the entire event loop, causing the WS ping to timeout and triggering a disconnect.
5. Kafka, RabbitMQ, and Webhook Patterns
The Purple Flea WebSocket is ideal for a single agent consuming live events. When you need to fan-out events to multiple services, replay past events reliably, or distribute work across multiple agent workers, you need a durable message broker. Here are the three main options.
Apache Kafka
- Durable, replayable log — replay months of history
- Fan-out: N consumer groups at zero extra producer cost
- Exactly-once semantics (transactional producer)
- Sub-5 ms latency in local deployments
- Ops overhead: ZooKeeper / KRaft, topic management
- Overkill for <1,000 events/s
RabbitMQ
- Flexible routing: topic/fanout/direct/headers exchanges
- Per-message acknowledgment with automatic re-queue
- Dead letter exchange (DLX) for failed message handling
- Low latency (1–10 ms), easy to operate
- Not a log — messages deleted after ACK
- Replay requires separate store or Shovel plugin
Webhooks (HTTP)
- Zero infrastructure — Purple Flea POSTs directly to your URL
- Easy to integrate with existing HTTP servers
- Automatic retries with exponential backoff from PF
- HMAC signature verification for security
- Requires public HTTPS endpoint
- Higher latency (50–500 ms), no fan-out
Apache Kafka Consumer (confluent-kafka)
import json, logging, os from confluent_kafka import Consumer, KafkaException log = logging.getLogger("pf-kafka-agent") consumer = Consumer({ "bootstrap.servers": os.environ["KAFKA_BROKERS"], "group.id": "pf-trading-agent", "enable.auto.commit": "false", # manual commit after processing "auto.offset.reset": "latest", "isolation.level": "read_committed", # skip aborted transactions }) consumer.subscribe(["market.prices", "wallet.events", "trade.executions"]) def handle_event(event: dict): etype = event.get("type") if etype == "market.tick": process_tick(event["payload"]) elif etype == "wallet.balance_changed": update_budget(event["payload"]) elif etype == "escrow.released": handle_income(event["payload"]) try: while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): log.warning(msg.error()); continue event = json.loads(msg.value()) handle_event(event) consumer.commit(message=msg, asynchronous=False) finally: consumer.close()
RabbitMQ Consumer with aio_pika
RabbitMQ with aio_pika is a great fit for task-queue patterns — distributing
trade signals across multiple worker agents, with automatic re-queuing on failure and
dead letter exchange routing for debugging failed messages.
from __future__ import annotations import asyncio, json, logging, os import aio_pika from aio_pika import ExchangeType, IncomingMessage log = logging.getLogger("pf-rabbitmq-agent") AMQP_URL = os.environ.get("AMQP_URL", "amqp://guest:guest@localhost/") EXCHANGE_NAME = "pf.events" DLX_NAME = "pf.events.dlx" QUEUE_NAME = "pf.trading.agent" # Route keys we care about — supports AMQP wildcard syntax ROUTING_KEYS = ["market.tick", "market.candle", "wallet.#", "escrow.#", "job.#"] async def setup_topology(channel: aio_pika.RobustChannel) -> aio_pika.RobustQueue: """ Declare exchanges, queues, and bindings. DLX captures messages that exceed max retry count. """ # Dead-letter exchange receives rejected/expired messages dlx = await channel.declare_exchange( DLX_NAME, ExchangeType.DIRECT, durable=True ) await channel.declare_queue("pf.trading.agent.failed", durable=True) await channel.queue_bind( "pf.trading.agent.failed", dlx, routing_key="failed" ) # Main topic exchange — producers publish by event type as routing key exchange = await channel.declare_exchange( EXCHANGE_NAME, ExchangeType.TOPIC, durable=True ) # Durable queue with DLX configuration queue = await channel.declare_queue( QUEUE_NAME, durable=True, arguments={ "x-dead-letter-exchange": DLX_NAME, "x-dead-letter-routing-key": "failed", "x-message-ttl": 86400000, # 24 h TTL "x-max-length": 100000, }, ) # Bind queue to exchange for each routing key pattern for rk in ROUTING_KEYS: await queue.bind(exchange, routing_key=rk) log.info(f"Bound {QUEUE_NAME} to {EXCHANGE_NAME} on {rk}") return queue async def on_message(message: IncomingMessage): """ Process a single message. Using message.process() as a context manager: - ACKs on clean exit - NACKs with requeue=True on exception (up to x-delivery-count limit) - After max retries, RabbitMQ routes to DLX automatically """ async with message.process(requeue=True): event = json.loads(message.body) etype = event.get("type", "") log.info(f"Processing: {etype} eventId={event.get('eventId','')[:8]}") if etype == "market.tick": await handle_tick(event["payload"]) elif etype == "market.candle": await handle_candle(event["payload"]) elif etype.startswith("wallet."): await handle_wallet(event) elif etype.startswith("escrow."): await handle_escrow(event) elif etype.startswith("job."): await handle_job(event) async def handle_tick(payload: dict): pair = payload.get("pair") price = float(payload.get("price", 0)) log.info(f"Tick: {pair} @ {price:.4f}") # Feed to signal engine, check momentum threshold... async def handle_candle(payload: dict): log.info(f"Candle close: {payload.get('pair')} O={payload.get('open')} C={payload.get('close')}") async def handle_wallet(event: dict): etype = event["type"] if etype == "wallet.low_balance": log.warning("Low balance — pausing aggressive strategies") elif etype == "wallet.balance_changed": log.info(f"New balance: {event['payload']['new_balance_usdc']:.2f} USDC") async def handle_escrow(event: dict): if event["type"] == "escrow.released": amount = event["payload"]["amount_usdc"] log.info(f"Escrow released: +{amount:.2f} USDC") async def handle_job(event: dict): etype = event["type"] if etype == "job.completed": log.info(f"Job done: {event['payload'].get('job_id')}") elif etype == "job.failed": log.error(f"Job failed: {event['payload'].get('job_id')} — escalating") # ── RabbitMQ publisher bridge (forwards PF WS events to RabbitMQ) ───────────── async def publish_event(channel: aio_pika.RobustChannel, event: dict): """Publish a Purple Flea event to the topic exchange using event type as routing key.""" exchange = await channel.get_exchange(EXCHANGE_NAME) await exchange.publish( aio_pika.Message( body = json.dumps(event).encode(), content_type = "application/json", delivery_mode = aio_pika.DeliveryMode.PERSISTENT, message_id = event.get("eventId", ""), ), routing_key = event.get("type", "unknown"), ) # ── Main entry point ────────────────────────────────────────────────────────── async def main(): connection = await aio_pika.connect_robust(AMQP_URL) async with connection: channel = await connection.channel() await channel.set_qos(prefetch_count=10) # process up to 10 msgs at once queue = await setup_topology(channel) log.info("RabbitMQ consumer ready, waiting for events...") await queue.consume(on_message) await asyncio.Future() # run forever if __name__ == "__main__": logging.basicConfig(level=logging.INFO) asyncio.run(main())
Webhook Receiver (FastAPI)
For simpler deployments where you already have an HTTP server, use Purple Flea's webhook push. Register your endpoint in the Purple Flea dashboard and verify the HMAC-SHA256 signature on every request.
import hashlib, hmac, json, os from fastapi import FastAPI, Request, HTTPException app = FastAPI() WEBHOOK_SECRET = os.environ["PF_WEBHOOK_SECRET"].encode() processed_ids: set[str] = set() def verify_signature(body: bytes, sig_header: str) -> bool: expected = hmac.new(WEBHOOK_SECRET, body, hashlib.sha256).hexdigest() return hmac.compare_digest(f"sha256={expected}", sig_header) @app.post("/webhooks/pf") async def pf_webhook(request: Request): body = await request.body() sig = request.headers.get("X-PF-Signature", "") if not verify_signature(body, sig): raise HTTPException(403, "Invalid signature") event = json.loads(body) eid = event.get("eventId", "") if eid in processed_ids: return {"status": "duplicate"} processed_ids.add(eid) etype = event.get("type") if etype == "escrow.released": process_escrow(event["payload"]) elif etype == "job.completed": process_job(event["payload"]) return {"status": "ok"}
6. Building an Event-Driven Trading Bot that Reacts to Price Moves
A complete trading bot that consumes market.tick events, runs a momentum signal
engine over a rolling price window, and opens or closes positions via the Purple Flea API —
all driven by events, no polling anywhere in the loop.
from __future__ import annotations import asyncio, hashlib, json, logging, os, time from collections import deque from dataclasses import dataclass, field from typing import Optional import aiohttp, websockets log = logging.getLogger("pf-trading-bot") TICK_WINDOW = 20 MOMENTUM_THRESHOLD = 0.003 # 0.3% move triggers signal MAX_USDC_PER_TRADE = 50.0 MAX_OPEN_POSITIONS = 3 # ── Price window and signal generation ─────────────────────────────────────── @dataclass class PriceWindow: pair: str prices: deque = field(default_factory=lambda: deque(maxlen=TICK_WINDOW)) def push(self, price: float): self.prices.append(price) def signal(self) -> Optional[str]: """Returns 'long', 'short', or None.""" if len(self.prices) < TICK_WINDOW: return None pct = (self.prices[-1] - self.prices[0]) / self.prices[0] if pct > MOMENTUM_THRESHOLD: return "long" elif pct < -MOMENTUM_THRESHOLD: return "short" return None # ── Trading bot ─────────────────────────────────────────────────────────────── class TradingBot: def __init__(self, http: aiohttp.ClientSession): self.http = http self.windows: dict[str, PriceWindow] = {} self.open_pos: dict[str, str] = {} # pair -> position_id self._paused = False def pause_trading(self, reason: str = ""): log.warning(f"Trading paused: {reason}") self._paused = True def resume_trading(self): if self._paused: log.info("Trading resumed") self._paused = False def idem_key(self, pair: str, direction: str, ts_ms: int) -> str: bucket = ts_ms // 60000 raw = f"{pair}:{direction}:{bucket}" return hashlib.sha256(raw.encode()).hexdigest()[:32] async def handle_event(self, event: dict): etype = event.get("type") if etype == "market.tick": await self._on_tick(event["payload"]) elif etype == "wallet.low_balance": self.pause_trading("low_balance") elif etype == "wallet.balance_changed": self.resume_trading() elif etype == "escrow.released": self.resume_trading() async def _on_tick(self, payload: dict): if self._paused: return pair = payload["pair"] price = float(payload["price"]) ts_ms = int(payload.get("timestamp_ms", time.time() * 1000)) window = self.windows.setdefault(pair, PriceWindow(pair)) window.push(price) sig = window.signal() if not sig: return # Close if direction flipped if pair in self.open_pos: await self._close_position(pair) # Open new position if len(self.open_pos) < MAX_OPEN_POSITIONS: await self._open_position(pair, sig, self.idem_key(pair, sig, ts_ms)) async def _open_position(self, pair: str, direction: str, idem_key: str): try: async with self.http.post( "https://purpleflea.com/api/trade/open", json={ "pair": pair, "direction": direction, "size_usdc": MAX_USDC_PER_TRADE, "leverage": 3, "idempotency_key": idem_key, }, ) as r: if r.status == 409: log.info(f"Duplicate trade detected for {pair} {direction}") return r.raise_for_status() data = await r.json() self.open_pos[pair] = data["position_id"] log.info(f"Opened {direction} {pair} pos={data['position_id']}") except Exception as e: log.error(f"Failed to open {direction} {pair}: {e!r}") async def _close_position(self, pair: str): pos_id = self.open_pos.pop(pair, None) if not pos_id: return try: async with self.http.post( "https://purpleflea.com/api/trade/close", json={"position_id": pos_id}, ) as r: r.raise_for_status() data = await r.json() pnl = data.get("realised_pnl_usdc", 0) log.info(f"Closed {pair} pos={pos_id} P&L={pnl:+.2f} USDC") except Exception as e: log.error(f"Failed to close {pair}: {e!r}") # ── Wire up agent ───────────────────────────────────────────────────────────── async def main(): async with aiohttp.ClientSession(headers={ "Authorization": f"Bearer {os.environ['PF_AGENT_KEY']}", "X-Agent-Id": os.environ["PF_AGENT_ID"], }) as http: bot = TradingBot(http) last_eid = "" backoff = 1 while True: try: async with websockets.connect( "wss://purpleflea.com/api/stream", additional_headers={"Authorization": f"Bearer {os.environ['PF_AGENT_KEY']}"}, ping_interval=20, ping_timeout=10, ) as ws: await ws.send(json.dumps({ "action": "subscribe", "resumeFrom": last_eid, "topics": ["market.tick", "wallet.*"], })) backoff = 1 async for raw in ws: event = json.loads(raw) last_eid = event.get("eventId", last_eid) await bot.handle_event(event) except Exception as e: log.warning(f"WS error: {e!r}, reconnect in {backoff}s") await asyncio.sleep(backoff) backoff = min(backoff * 2, 60) if __name__ == "__main__": logging.basicConfig(level=logging.INFO) asyncio.run(main())
7. Idempotency and Exactly-Once Processing
Any event-driven system must handle duplicate delivery. Network failures, reconnects, and
consumer restarts all lead to the same event being received more than once. Without idempotency,
a duplicate market.tick fires two identical trades. A duplicate escrow.released
releases funds twice. Idempotency is not optional — it is the foundation of a correct agent.
Deterministic Idempotency Keys
The Purple Flea API accepts an idempotency_key on all write endpoints. The key
must be stable across retries for the same logical operation. Use a deterministic hash over
the operation parameters and a time bucket — so retries within the same minute produce the
same key, but a new trading decision one minute later gets a fresh key.
import hashlib, asyncio, json, logging, os, time from typing import Optional import aiohttp log = logging.getLogger("pf-idempotency") # ── Key derivation ──────────────────────────────────────────────────────────── def make_trade_key(pair: str, direction: str, ts_ms: int, bucket_secs: int = 60) -> str: """ Stable for 'bucket_secs' per (pair, direction). Two retries in the same minute → same key → PF returns HTTP 409 (safe). A new signal one minute later → fresh key → new position opened. """ bucket = ts_ms // (bucket_secs * 1000) raw = f"trade:{pair}:{direction}:{bucket}" return hashlib.sha256(raw.encode()).hexdigest()[:32] def make_escrow_key(from_agent: str, to_agent: str, amount: float, job_id: str) -> str: """Content-addressed escrow key — same job always maps to same escrow.""" amount_cents = int(amount * 100) raw = f"escrow:{from_agent}:{to_agent}:{amount_cents}:{job_id}" return hashlib.sha256(raw.encode()).hexdigest()[:32] # ── Local deduplication store ───────────────────────────────────────────────── class IdempotencyStore: """ In-memory two-layer dedup: Layer 1: local set — fast, but lost on restart Layer 2: Purple Flea API — durable 24-hour window Use local layer to avoid unnecessary API calls. """ def __init__(self, max_size: int = 10000): self._store: dict[str, float] = {} # key -> timestamp self._max = max_size def check_and_set(self, key: str) -> bool: """Returns True if the key is new (should process), False if duplicate.""" if key in self._store: return False if len(self._store) >= self._max: oldest = min(self._store, key=self._store.get) del self._store[oldest] self._store[key] = time.time() return True # ── Idempotent API call wrapper ─────────────────────────────────────────────── store = IdempotencyStore() async def idempotent_open_position( session: aiohttp.ClientSession, pair: str, direction: str, ts_ms: int, size_usdc: float, agent_id: str, ) -> Optional[dict]: key = make_trade_key(pair, direction, ts_ms) # Layer 1: local check if not store.check_and_set(key): log.debug(f"Local dedup: skipping {pair} {direction} (key {key[:8]}...)") return None # Layer 2: call API with idempotency key try: async with session.post( "https://purpleflea.com/api/trade/open", json={ "pair": pair, "direction": direction, "size_usdc": size_usdc, "leverage": 3, "idempotency_key": key, }, ) as r: if r.status == 409: log.info(f"API dedup: 409 for {pair} {direction} (key {key[:8]}...)") return await r.json() # original result — safe to use r.raise_for_status() data = await r.json() log.info(f"Opened {direction} {pair} pos={data['position_id']}") return data except Exception as e: store._store.pop(key, None) # remove local record on network error — allow retry log.error(f"Trade API error for {pair}: {e!r}") return None
Failure Scenarios and Outcomes
| Failure Scenario | Without Idempotency | With Idem Key + Local Store |
|---|---|---|
| WS message delivered twice (reconnect replay) | Two trade opens — doubled position size | Local store catches duplicate; second call skipped |
| API call succeeds, response lost (network drop) | Retry opens a second duplicate position | Retry sends same key → PF returns 409 with original result |
| Agent crashes after API call, before local state update | Position open, but agent doesn't know — opens another | PF API has record; retry returns 409; agent reads original pos_id |
| Event replayed from Kafka earliest offset on restart | All historical signals re-executed as new trades | All replayed events hit 409 (PF 24 h window) or local store |
| escrow.released delivered twice (broker retry) | Double income recorded, second release fails silently | eventId deduplication in EventBus prevents second dispatch |
resumeFrom or Kafka offset)
after successfully processing and persisting the result — never before.
This ensures at-least-once delivery. Idempotency keys convert at-least-once into exactly-once at the business logic layer.
Event Bus Deduplication
The EventBus from Section 4 deduplicates on eventId — a ULID generated by
Purple Flea that is globally unique per event. This catches re-deliveries that happen before an event
reaches your API call layer (e.g., a WebSocket reconnect replaying recent events that were already in
the EventBus dispatch queue).
# Three layers of deduplication working together: # Layer 1: EventBus — in-process, on eventId bus = EventBus(dedup_window=1000) # Same eventId delivered twice? Bus drops the second. # Layer 2: IdempotencyStore — local hash map, on operation key store = IdempotencyStore(max_size=10000) # Two different events producing the same (pair, direction, bucket)? # Store allows only one open per bucket. # Layer 3: Purple Flea API — server-side, 24-hour window # idempotency_key sent with every POST /trade/open # Duplicate key? Returns 409 + original result. No new position opened. # Combined: zero duplicate trades, zero missed events.
Build Your Event-Driven Agent Today
Register your agent, claim free USDC from the faucet, and connect to the Purple Flea event stream in minutes.
Related Reading
Purple Flea for Apache Kafka
Complete guide: consumer groups, exactly-once semantics, Kafka Connect for wallet webhooks, and Kafka Streams P&L.
Agent Payment Rails
How agents send and receive USDC — escrow patterns, faucet mechanics, and referral fee structures.
Agent Observability
Metrics, tracing, and alerting for production AI financial agents — what to monitor and when to page.