Event-Driven Architecture for AI Financial Agents

Stop polling. Build agents that react instantly to market ticks, balance changes, escrow releases, and job completions using WebSocket streams, Apache Kafka, RabbitMQ, and webhooks — with exactly-once idempotency so no event is processed twice.

1. Why Event-Driven: Reactive vs Polling

Most developers reach for REST polling first. It is simple: ask the server every N seconds, compare the response with the previous one, act on deltas. The problem is that financial markets do not care about your polling interval.

A market tick might move price 0.8% in 200 milliseconds. If your agent polls every 5 seconds, it will have missed the move, opened a position at a stale price, and eaten slippage. A wallet.balance_changed event fires the moment a trade settles — if you are polling on a 30-second cycle, your risk guard sees the updated balance 15 seconds late on average. An escrow.released event triggers your cash flow — polling means delayed income recognition and slower reinvestment.

❌ Polling (REST)

  • Adds latency equal to half the poll interval on average
  • Wastes compute and API quota on empty responses
  • Misses short-lived events between polls
  • Rate-limited by burst allowances
  • Scales poorly: N agents × N endpoints = O(N²) load
  • Hard to replay past events for backtest

✅ Event-Driven (Push)

  • Sub-millisecond delivery on WebSocket and Kafka
  • No wasted requests — server pushes only on change
  • Every state transition captured as a durable event
  • Consumers set their own throughput via backpressure
  • Fan-out: one topic feeds N consumer groups at zero extra cost
  • Replay from any offset for backtest or audit

The Latency Math

For a poll interval of T seconds, the expected latency to observe an event is T / 2. With a WebSocket, the expected latency is the network round-trip time — typically 10–50 ms. For a 5-second poll cycle, that is a 2,500 ms vs 25 ms comparison: two orders of magnitude difference. In a momentum trade, that gap is the difference between entering at signal and entering after the move has already run.

Method Avg Latency Missed Events Server Load Best For
REST poll (5 s) 2,500 ms Common High (N×M req/s) Config fetch, one-off queries
REST poll (1 s) 500 ms Occasional Very High Non-critical dashboards
WebSocket 10–50 ms None (if connected) Low (persistent conn) Live prices, real-time alerts
Kafka consumer <5 ms (local) None (durable log) Very Low High-throughput, replayable
RabbitMQ push 1–10 ms None (durable queue) Low Task queues, work distribution
Webhook (HTTP) 50–500 ms None (with retry) Minimal Third-party integrations
💡
Hybrid rule of thumb: Use events for anything that changes your agent's behaviour. Use REST polling only for configuration (agent registration, strategy parameters, historical data). Purple Flea provides both — the WebSocket stream at wss://purpleflea.com/api/stream for live events, and REST endpoints for queries.

2. Event Types: Market, Wallet, Escrow, and Job

Purple Flea emits events across four domains. Understanding the full catalog lets you build agents that react to the right signal at the right time — not just price moves.

market.tick

Price Tick

Real-time OHLCV update per trading pair. Fires on every exchange update, typically 100–500 ms cadence.

market.candle

Candle Close

1-minute candle close event. Use for slower strategies that need OHLCV rather than raw ticks.

wallet.balance_changed

Balance Change

Fires after any USDC balance movement: trade open/close, deposit, withdrawal, faucet claim, escrow release.

wallet.low_balance

Low Balance Alert

Fires when USDC balance drops below the agent's configured threshold. Trigger pause-trading or faucet-refill logic.

escrow.released

Escrow Released

Counterparty confirmed job completion; funds transferred. Use to trigger next-task hiring or income recognition.

escrow.disputed

Escrow Disputed

Buyer or seller opened a dispute. Agent should halt dependent workflows and gather evidence for resolution.

job.completed

Job Completed

A downstream agent you hired finished a task. Payload includes result hash and delivery URI.

job.failed

Job Failed

Downstream agent failed or timed out. Trigger retry-with-fallback or escalation logic automatically.

Event Envelope Schema

Every Purple Flea event shares a common envelope. The eventId is a ULID (lexicographically sortable, monotonically increasing) — ideal for deduplication and ordering without a central coordinator.

event_envelope.json JSON Schema
{
  "eventId":  "01HZ9K3M2Q7WB5VN8PXCT4YRDJ",  // ULID — globally unique, time-ordered
  "type":     "market.tick",               // dot-namespaced: domain.name
  "agentId": "pf_agent_abc123",           // recipient agent
  "payload": { ... },                    // domain-specific body
  "ts":       1741344000000             // Unix epoch milliseconds
}
Event Type Frequency Typical Latency Durable Trigger Action
market.tick 100–500 ms/pair <50 ms WS No (live only) Signal generation, momentum check
market.candle Every 1 min <100 ms WS Yes (Kafka log) Strategy backcalculation, OHLCV-based signals
wallet.balance_changed On event <200 ms Yes Budget guard, position sizing update
wallet.low_balance On threshold breach <200 ms Yes Pause trading, claim faucet
escrow.released On event <500 ms Yes Income recognition, hire next agent
job.completed On event <1 s Yes Use result, release payment

3. WebSocket-Based Event Consumption from Purple Flea APIs

The simplest way to consume Purple Flea events is the WebSocket stream. A single persistent connection delivers all event types for your agent — no need to subscribe to separate endpoints per domain. The stream multiplexes market ticks, wallet events, and job events on one channel.

Connection and Subscription Protocol

ws_consumer.py Python
from __future__ import annotations
import asyncio, json, logging, os, time
from typing import Callable, Awaitable

import websockets
from websockets.exceptions import ConnectionClosed

log = logging.getLogger("pf-ws")

WS_URL    = "wss://purpleflea.com/api/stream"
AGENT_KEY = os.environ["PF_AGENT_KEY"]   # pf_live_... never sk_live_
AGENT_ID  = os.environ["PF_AGENT_ID"]

EventHandler = Callable[[dict], Awaitable[None]]
handlers: dict[str, list[EventHandler]] = {}

def on(event_type: str):
    """Decorator to register an async handler for an event type."""
    def _deco(fn: EventHandler):
        handlers.setdefault(event_type, []).append(fn)
        return fn
    return _deco

async def dispatch(event: dict):
    etype = event.get("type", "")
    # Try exact match first, then wildcard (e.g. "market.*")
    targets = handlers.get(etype, []) + handlers.get(f"{etype.split('.')[0]}.*", [])
    await asyncio.gather(*(h(event) for h in targets), return_exceptions=True)

async def connect_and_subscribe(last_event_id: str = ""):
    """
    Persistent WebSocket loop with exponential backoff reconnect.
    Passes resumeFrom so the server replays missed events after a disconnect.
    """
    backoff = 1
    while True:
        try:
            headers = {
                "Authorization": f"Bearer {AGENT_KEY}",
                "X-Agent-Id": AGENT_ID,
            }
            async with websockets.connect(
                WS_URL,
                additional_headers=headers,
                ping_interval=20,
                ping_timeout=10,
            ) as ws:
                # Subscribe with resume cursor for gap recovery
                await ws.send(json.dumps({
                    "action":     "subscribe",
                    "resumeFrom": last_event_id,   # replay missed events
                    "topics":     ["market.*", "wallet.*", "escrow.*", "job.*"],
                }))
                log.info(f"WS connected, resumeFrom={last_event_id or 'latest'}")
                backoff = 1   # reset on successful connect

                async for raw_msg in ws:
                    event = json.loads(raw_msg)
                    last_event_id = event.get("eventId", last_event_id)
                    await dispatch(event)

        except ConnectionClosed as e:
            log.warning(f"WS closed ({e.code}): reconnecting in {backoff}s")
        except Exception as e:
            log.error(f"WS error: {e!r}: reconnecting in {backoff}s")

        await asyncio.sleep(backoff)
        backoff = min(backoff * 2, 60)   # cap at 60 s

# ── Example handlers ─────────────────────────────────────────────────────────
@on("market.tick")
async def handle_tick(event: dict):
    payload = event["payload"]
    log.info(f"Tick {payload['pair']} @ {payload['price']:.4f}")

@on("wallet.low_balance")
async def handle_low_balance(event: dict):
    log.warning(f"Low balance alert: {event['payload']['balance_usdc']:.2f} USDC")
    # Trigger faucet claim or pause trading...

@on("escrow.released")
async def handle_escrow_release(event: dict):
    p = event["payload"]
    log.info(f"Escrow {p['escrow_id']} released: +{p['amount_usdc']:.2f} USDC")
    # Hire next downstream agent or reinvest...

if __name__ == "__main__":
    asyncio.run(connect_and_subscribe())
Gap recovery: Always persist last_event_id to durable storage (Redis, SQLite, file) before processing each event. On reconnect, pass it as resumeFrom to receive any events that were delivered during the disconnect window. Purple Flea buffers up to 5 minutes of events per agent for replay.

WebSocket Best Practices

Ping / Keepalive

Set ping_interval=20 on the websockets client. The server drops connections idle for more than 60 s. Missing pings cause silent disconnects that look like hangs.

Backpressure

If your dispatch() is slow, message queue grows in memory. Move heavy processing to a thread pool via asyncio.to_thread() and keep the WS read loop non-blocking.

Single Connection

One WebSocket per agent instance. The server multiplexes all topics on one connection — opening multiple connections per agent wastes resources and may trigger rate limiting.

Resume Cursor

Store the last eventId after processing (not before). Processing then crash = safe to re-deliver. Storing before processing = event silently dropped on crash.

4. Python AsyncIO Event Loop for Multi-Service Agents

A production agent typically runs several concurrent coroutines: a WebSocket loop for real-time events, a periodic health check, a REST gap-filler for missed candles, and a state reconciler. Python's asyncio.gather() schedules all of these cooperatively on a single thread — no race conditions, no locks needed for shared in-process state.

agent_base.py Python
from __future__ import annotations
import asyncio, json, logging, os, time
from typing import Callable, Awaitable, Any

import aiohttp, websockets

log = logging.getLogger("pf-agent")
EventHandler = Callable[[dict], Awaitable[None]]

# ── In-process event bus with deduplication ───────────────────────────────────
class EventBus:
    def __init__(self, dedup_window: int = 1000):
        self._handlers: dict[str, list[EventHandler]] = {}
        self._seen_ids: set[str] = set()
        self._dedup_window = dedup_window

    def on(self, event_type: str, handler: EventHandler) -> None:
        self._handlers.setdefault(event_type, []).append(handler)

    async def emit(self, event: dict) -> None:
        eid = event.get("eventId", "")
        if eid and eid in self._seen_ids:
            return   # deduplicate on re-delivery
        if eid:
            self._seen_ids.add(eid)
            if len(self._seen_ids) > self._dedup_window:
                self._seen_ids.pop()   # evict oldest (set is unordered; fine for LRU approx)

        etype = event.get("type", "")
        targets = (
            self._handlers.get(etype, []) +
            self._handlers.get(f"{etype.split('.')[0]}.*", []) +
            self._handlers.get("*", [])
        )
        results = await asyncio.gather(*(h(event) for h in targets), return_exceptions=True)
        for r in results:
            if isinstance(r, Exception):
                log.error(f"Handler error for {etype}: {r!r}")

# ── Multi-service agent base class ────────────────────────────────────────────
class PurpleFleatAgent:
    def __init__(self):
        self.bus           = EventBus()
        self.agent_key     = os.environ["PF_AGENT_KEY"]
        self.agent_id      = os.environ["PF_AGENT_ID"]
        self._last_event   = ""
        self._running      = True
        self._http: aiohttp.ClientSession | None = None

    async def run(self):
        async with aiohttp.ClientSession(headers={
            "Authorization": f"Bearer {self.agent_key}",
            "X-Agent-Id": self.agent_id,
        }) as session:
            self._http = session
            await asyncio.gather(
                self._ws_loop(),
                self._health_check(),
                self._price_backup(),
                self._reconcile_state(),
            )

    async def _ws_loop(self):
        """Main event stream. Reconnects with exponential backoff."""
        backoff = 1
        while self._running:
            try:
                async with websockets.connect(
                    "wss://purpleflea.com/api/stream",
                    additional_headers={
                        "Authorization": f"Bearer {self.agent_key}",
                        "X-Agent-Id": self.agent_id,
                    },
                    ping_interval=20, ping_timeout=10,
                ) as ws:
                    await ws.send(json.dumps({
                        "action":     "subscribe",
                        "resumeFrom": self._last_event,
                        "topics":     ["market.*", "wallet.*", "escrow.*", "job.*"],
                    }))
                    log.info(f"WS connected")
                    backoff = 1
                    async for raw in ws:
                        event = json.loads(raw)
                        self._last_event = event.get("eventId", self._last_event)
                        await self.bus.emit(event)

            except Exception as e:
                log.warning(f"WS disconnected ({e!r}), retry in {backoff}s")
                await asyncio.sleep(backoff)
                backoff = min(backoff * 2, 60)

    async def _health_check(self):
        """Ping the Purple Flea health endpoint every 5 minutes."""
        while self._running:
            await asyncio.sleep(300)
            try:
                async with self._http.get("https://purpleflea.com/api/status") as r:
                    data = await r.json()
                    log.info(f"Health: {data.get('status', 'unknown')}")
            except Exception as e:
                log.warning(f"Health check failed: {e!r}")

    async def _price_backup(self):
        """
        REST gap-fill: every 60 s fetch latest prices via REST in case
        the WS missed ticks during a reconnect window.
        """
        while self._running:
            await asyncio.sleep(60)
            try:
                async with self._http.get("https://purpleflea.com/api/market/prices") as r:
                    prices = await r.json()
                    for item in prices.get("pairs", []):
                        synthetic = {
                            "eventId": f"rest-gap-{item['pair']}-{int(time.time())}",
                            "type":    "market.tick",
                            "payload": item,
                            "ts":      int(time.time() * 1000),
                        }
                        await self.bus.emit(synthetic)
            except Exception as e:
                log.warning(f"Price backup failed: {e!r}")

    async def _reconcile_state(self):
        """Reconcile open positions and wallet balance every 10 minutes."""
        while self._running:
            await asyncio.sleep(600)
            try:
                async with self._http.get("https://purpleflea.com/api/trade/positions") as r:
                    positions = (await r.json()).get("positions", [])
                    log.info(f"Reconcile: {len(positions)} open positions")
            except Exception as e:
                log.warning(f"Reconcile failed: {e!r}")
asyncio + threads: Never call blocking code (requests, time.sleep, file I/O) directly inside an async coroutine. Use asyncio.to_thread(blocking_fn) for CPU-bound or blocking calls. One blocking call can freeze the entire event loop, causing the WS ping to timeout and triggering a disconnect.

5. Kafka, RabbitMQ, and Webhook Patterns

The Purple Flea WebSocket is ideal for a single agent consuming live events. When you need to fan-out events to multiple services, replay past events reliably, or distribute work across multiple agent workers, you need a durable message broker. Here are the three main options.

Apache Kafka

  • Durable, replayable log — replay months of history
  • Fan-out: N consumer groups at zero extra producer cost
  • Exactly-once semantics (transactional producer)
  • Sub-5 ms latency in local deployments
  • Ops overhead: ZooKeeper / KRaft, topic management
  • Overkill for <1,000 events/s

RabbitMQ

  • Flexible routing: topic/fanout/direct/headers exchanges
  • Per-message acknowledgment with automatic re-queue
  • Dead letter exchange (DLX) for failed message handling
  • Low latency (1–10 ms), easy to operate
  • Not a log — messages deleted after ACK
  • Replay requires separate store or Shovel plugin

Webhooks (HTTP)

  • Zero infrastructure — Purple Flea POSTs directly to your URL
  • Easy to integrate with existing HTTP servers
  • Automatic retries with exponential backoff from PF
  • HMAC signature verification for security
  • Requires public HTTPS endpoint
  • Higher latency (50–500 ms), no fan-out

Apache Kafka Consumer (confluent-kafka)

kafka_agent_consumer.py Python
import json, logging, os
from confluent_kafka import Consumer, KafkaException

log = logging.getLogger("pf-kafka-agent")

consumer = Consumer({
    "bootstrap.servers":  os.environ["KAFKA_BROKERS"],
    "group.id":           "pf-trading-agent",
    "enable.auto.commit": "false",    # manual commit after processing
    "auto.offset.reset":  "latest",
    "isolation.level":    "read_committed",  # skip aborted transactions
})
consumer.subscribe(["market.prices", "wallet.events", "trade.executions"])

def handle_event(event: dict):
    etype = event.get("type")
    if   etype == "market.tick":          process_tick(event["payload"])
    elif etype == "wallet.balance_changed": update_budget(event["payload"])
    elif etype == "escrow.released":       handle_income(event["payload"])

try:
    while True:
        msg = consumer.poll(1.0)
        if msg is None: continue
        if msg.error(): log.warning(msg.error()); continue
        event = json.loads(msg.value())
        handle_event(event)
        consumer.commit(message=msg, asynchronous=False)
finally:
    consumer.close()

RabbitMQ Consumer with aio_pika

RabbitMQ with aio_pika is a great fit for task-queue patterns — distributing trade signals across multiple worker agents, with automatic re-queuing on failure and dead letter exchange routing for debugging failed messages.

rabbitmq_agent_consumer.py Python
from __future__ import annotations
import asyncio, json, logging, os
import aio_pika
from aio_pika import ExchangeType, IncomingMessage

log = logging.getLogger("pf-rabbitmq-agent")

AMQP_URL     = os.environ.get("AMQP_URL", "amqp://guest:guest@localhost/")
EXCHANGE_NAME = "pf.events"
DLX_NAME      = "pf.events.dlx"
QUEUE_NAME    = "pf.trading.agent"

# Route keys we care about — supports AMQP wildcard syntax
ROUTING_KEYS  = ["market.tick", "market.candle", "wallet.#", "escrow.#", "job.#"]

async def setup_topology(channel: aio_pika.RobustChannel) -> aio_pika.RobustQueue:
    """
    Declare exchanges, queues, and bindings.
    DLX captures messages that exceed max retry count.
    """
    # Dead-letter exchange receives rejected/expired messages
    dlx = await channel.declare_exchange(
        DLX_NAME, ExchangeType.DIRECT, durable=True
    )
    await channel.declare_queue("pf.trading.agent.failed", durable=True)
    await channel.queue_bind(
        "pf.trading.agent.failed", dlx, routing_key="failed"
    )

    # Main topic exchange — producers publish by event type as routing key
    exchange = await channel.declare_exchange(
        EXCHANGE_NAME, ExchangeType.TOPIC, durable=True
    )

    # Durable queue with DLX configuration
    queue = await channel.declare_queue(
        QUEUE_NAME,
        durable=True,
        arguments={
            "x-dead-letter-exchange":    DLX_NAME,
            "x-dead-letter-routing-key": "failed",
            "x-message-ttl":             86400000,  # 24 h TTL
            "x-max-length":              100000,
        },
    )

    # Bind queue to exchange for each routing key pattern
    for rk in ROUTING_KEYS:
        await queue.bind(exchange, routing_key=rk)
        log.info(f"Bound {QUEUE_NAME} to {EXCHANGE_NAME} on {rk}")

    return queue

async def on_message(message: IncomingMessage):
    """
    Process a single message. Using message.process() as a context manager:
    - ACKs on clean exit
    - NACKs with requeue=True on exception (up to x-delivery-count limit)
    - After max retries, RabbitMQ routes to DLX automatically
    """
    async with message.process(requeue=True):
        event = json.loads(message.body)
        etype = event.get("type", "")
        log.info(f"Processing: {etype} eventId={event.get('eventId','')[:8]}")

        if   etype == "market.tick":          await handle_tick(event["payload"])
        elif etype == "market.candle":        await handle_candle(event["payload"])
        elif etype.startswith("wallet."):     await handle_wallet(event)
        elif etype.startswith("escrow."):     await handle_escrow(event)
        elif etype.startswith("job."):        await handle_job(event)

async def handle_tick(payload: dict):
    pair  = payload.get("pair")
    price = float(payload.get("price", 0))
    log.info(f"Tick: {pair} @ {price:.4f}")
    # Feed to signal engine, check momentum threshold...

async def handle_candle(payload: dict):
    log.info(f"Candle close: {payload.get('pair')} O={payload.get('open')} C={payload.get('close')}")

async def handle_wallet(event: dict):
    etype = event["type"]
    if etype == "wallet.low_balance":
        log.warning("Low balance — pausing aggressive strategies")
    elif etype == "wallet.balance_changed":
        log.info(f"New balance: {event['payload']['new_balance_usdc']:.2f} USDC")

async def handle_escrow(event: dict):
    if event["type"] == "escrow.released":
        amount = event["payload"]["amount_usdc"]
        log.info(f"Escrow released: +{amount:.2f} USDC")

async def handle_job(event: dict):
    etype = event["type"]
    if etype == "job.completed":
        log.info(f"Job done: {event['payload'].get('job_id')}")
    elif etype == "job.failed":
        log.error(f"Job failed: {event['payload'].get('job_id')} — escalating")

# ── RabbitMQ publisher bridge (forwards PF WS events to RabbitMQ) ─────────────
async def publish_event(channel: aio_pika.RobustChannel, event: dict):
    """Publish a Purple Flea event to the topic exchange using event type as routing key."""
    exchange = await channel.get_exchange(EXCHANGE_NAME)
    await exchange.publish(
        aio_pika.Message(
            body         = json.dumps(event).encode(),
            content_type = "application/json",
            delivery_mode = aio_pika.DeliveryMode.PERSISTENT,
            message_id   = event.get("eventId", ""),
        ),
        routing_key = event.get("type", "unknown"),
    )

# ── Main entry point ──────────────────────────────────────────────────────────
async def main():
    connection = await aio_pika.connect_robust(AMQP_URL)
    async with connection:
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)  # process up to 10 msgs at once
        queue   = await setup_topology(channel)
        log.info("RabbitMQ consumer ready, waiting for events...")
        await queue.consume(on_message)
        await asyncio.Future()  # run forever

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(main())

Webhook Receiver (FastAPI)

For simpler deployments where you already have an HTTP server, use Purple Flea's webhook push. Register your endpoint in the Purple Flea dashboard and verify the HMAC-SHA256 signature on every request.

webhook_receiver.py Python
import hashlib, hmac, json, os
from fastapi import FastAPI, Request, HTTPException

app = FastAPI()
WEBHOOK_SECRET = os.environ["PF_WEBHOOK_SECRET"].encode()
processed_ids: set[str] = set()

def verify_signature(body: bytes, sig_header: str) -> bool:
    expected = hmac.new(WEBHOOK_SECRET, body, hashlib.sha256).hexdigest()
    return hmac.compare_digest(f"sha256={expected}", sig_header)

@app.post("/webhooks/pf")
async def pf_webhook(request: Request):
    body = await request.body()
    sig  = request.headers.get("X-PF-Signature", "")
    if not verify_signature(body, sig):
        raise HTTPException(403, "Invalid signature")

    event  = json.loads(body)
    eid    = event.get("eventId", "")
    if eid in processed_ids:
        return {"status": "duplicate"}
    processed_ids.add(eid)

    etype = event.get("type")
    if   etype == "escrow.released": process_escrow(event["payload"])
    elif etype == "job.completed":   process_job(event["payload"])
    return {"status": "ok"}
💡
Choosing a broker: Start with WebSocket for simplicity (single agent, live events). Add RabbitMQ when you need work distribution across multiple agent replicas. Use Kafka when you need event replay, audit trails, or feeding multiple independent consumer groups (e.g., trading agent + P&L processor + risk monitor all consuming the same stream).

6. Building an Event-Driven Trading Bot that Reacts to Price Moves

A complete trading bot that consumes market.tick events, runs a momentum signal engine over a rolling price window, and opens or closes positions via the Purple Flea API — all driven by events, no polling anywhere in the loop.

trading_bot.py Python
from __future__ import annotations
import asyncio, hashlib, json, logging, os, time
from collections import deque
from dataclasses import dataclass, field
from typing import Optional

import aiohttp, websockets

log = logging.getLogger("pf-trading-bot")

TICK_WINDOW        = 20
MOMENTUM_THRESHOLD = 0.003   # 0.3% move triggers signal
MAX_USDC_PER_TRADE = 50.0
MAX_OPEN_POSITIONS = 3

# ── Price window and signal generation ───────────────────────────────────────
@dataclass
class PriceWindow:
    pair:   str
    prices: deque = field(default_factory=lambda: deque(maxlen=TICK_WINDOW))

    def push(self, price: float):
        self.prices.append(price)

    def signal(self) -> Optional[str]:
        """Returns 'long', 'short', or None."""
        if len(self.prices) < TICK_WINDOW:
            return None
        pct = (self.prices[-1] - self.prices[0]) / self.prices[0]
        if   pct >  MOMENTUM_THRESHOLD: return "long"
        elif pct < -MOMENTUM_THRESHOLD: return "short"
        return None

# ── Trading bot ───────────────────────────────────────────────────────────────
class TradingBot:
    def __init__(self, http: aiohttp.ClientSession):
        self.http        = http
        self.windows: dict[str, PriceWindow] = {}
        self.open_pos: dict[str, str]        = {}   # pair -> position_id
        self._paused     = False

    def pause_trading(self, reason: str = ""):
        log.warning(f"Trading paused: {reason}")
        self._paused = True

    def resume_trading(self):
        if self._paused:
            log.info("Trading resumed")
            self._paused = False

    def idem_key(self, pair: str, direction: str, ts_ms: int) -> str:
        bucket = ts_ms // 60000
        raw    = f"{pair}:{direction}:{bucket}"
        return hashlib.sha256(raw.encode()).hexdigest()[:32]

    async def handle_event(self, event: dict):
        etype = event.get("type")
        if   etype == "market.tick":          await self._on_tick(event["payload"])
        elif etype == "wallet.low_balance":   self.pause_trading("low_balance")
        elif etype == "wallet.balance_changed": self.resume_trading()
        elif etype == "escrow.released":      self.resume_trading()

    async def _on_tick(self, payload: dict):
        if self._paused:
            return
        pair  = payload["pair"]
        price = float(payload["price"])
        ts_ms = int(payload.get("timestamp_ms", time.time() * 1000))

        window = self.windows.setdefault(pair, PriceWindow(pair))
        window.push(price)
        sig = window.signal()

        if not sig:
            return

        # Close if direction flipped
        if pair in self.open_pos:
            await self._close_position(pair)

        # Open new position
        if len(self.open_pos) < MAX_OPEN_POSITIONS:
            await self._open_position(pair, sig, self.idem_key(pair, sig, ts_ms))

    async def _open_position(self, pair: str, direction: str, idem_key: str):
        try:
            async with self.http.post(
                "https://purpleflea.com/api/trade/open",
                json={
                    "pair":            pair,
                    "direction":       direction,
                    "size_usdc":       MAX_USDC_PER_TRADE,
                    "leverage":        3,
                    "idempotency_key": idem_key,
                },
            ) as r:
                if r.status == 409:
                    log.info(f"Duplicate trade detected for {pair} {direction}")
                    return
                r.raise_for_status()
                data = await r.json()
                self.open_pos[pair] = data["position_id"]
                log.info(f"Opened {direction} {pair} pos={data['position_id']}")
        except Exception as e:
            log.error(f"Failed to open {direction} {pair}: {e!r}")

    async def _close_position(self, pair: str):
        pos_id = self.open_pos.pop(pair, None)
        if not pos_id: return
        try:
            async with self.http.post(
                "https://purpleflea.com/api/trade/close",
                json={"position_id": pos_id},
            ) as r:
                r.raise_for_status()
                data = await r.json()
                pnl = data.get("realised_pnl_usdc", 0)
                log.info(f"Closed {pair} pos={pos_id} P&L={pnl:+.2f} USDC")
        except Exception as e:
            log.error(f"Failed to close {pair}: {e!r}")

# ── Wire up agent ─────────────────────────────────────────────────────────────
async def main():
    async with aiohttp.ClientSession(headers={
        "Authorization": f"Bearer {os.environ['PF_AGENT_KEY']}",
        "X-Agent-Id":    os.environ["PF_AGENT_ID"],
    }) as http:
        bot       = TradingBot(http)
        last_eid  = ""
        backoff   = 1

        while True:
            try:
                async with websockets.connect(
                    "wss://purpleflea.com/api/stream",
                    additional_headers={"Authorization": f"Bearer {os.environ['PF_AGENT_KEY']}"},
                    ping_interval=20, ping_timeout=10,
                ) as ws:
                    await ws.send(json.dumps({
                        "action":     "subscribe",
                        "resumeFrom": last_eid,
                        "topics":     ["market.tick", "wallet.*"],
                    }))
                    backoff = 1
                    async for raw in ws:
                        event   = json.loads(raw)
                        last_eid = event.get("eventId", last_eid)
                        await bot.handle_event(event)
            except Exception as e:
                log.warning(f"WS error: {e!r}, reconnect in {backoff}s")
                await asyncio.sleep(backoff)
                backoff = min(backoff * 2, 60)

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(main())

7. Idempotency and Exactly-Once Processing

Any event-driven system must handle duplicate delivery. Network failures, reconnects, and consumer restarts all lead to the same event being received more than once. Without idempotency, a duplicate market.tick fires two identical trades. A duplicate escrow.released releases funds twice. Idempotency is not optional — it is the foundation of a correct agent.

Deterministic Idempotency Keys

The Purple Flea API accepts an idempotency_key on all write endpoints. The key must be stable across retries for the same logical operation. Use a deterministic hash over the operation parameters and a time bucket — so retries within the same minute produce the same key, but a new trading decision one minute later gets a fresh key.

idempotency.py Python
import hashlib, asyncio, json, logging, os, time
from typing import Optional

import aiohttp

log = logging.getLogger("pf-idempotency")

# ── Key derivation ────────────────────────────────────────────────────────────
def make_trade_key(pair: str, direction: str, ts_ms: int, bucket_secs: int = 60) -> str:
    """
    Stable for 'bucket_secs' per (pair, direction).
    Two retries in the same minute → same key → PF returns HTTP 409 (safe).
    A new signal one minute later → fresh key → new position opened.
    """
    bucket = ts_ms // (bucket_secs * 1000)
    raw    = f"trade:{pair}:{direction}:{bucket}"
    return hashlib.sha256(raw.encode()).hexdigest()[:32]

def make_escrow_key(from_agent: str, to_agent: str, amount: float, job_id: str) -> str:
    """Content-addressed escrow key — same job always maps to same escrow."""
    amount_cents = int(amount * 100)
    raw = f"escrow:{from_agent}:{to_agent}:{amount_cents}:{job_id}"
    return hashlib.sha256(raw.encode()).hexdigest()[:32]

# ── Local deduplication store ─────────────────────────────────────────────────
class IdempotencyStore:
    """
    In-memory two-layer dedup:
      Layer 1: local set — fast, but lost on restart
      Layer 2: Purple Flea API — durable 24-hour window
    Use local layer to avoid unnecessary API calls.
    """

    def __init__(self, max_size: int = 10000):
        self._store: dict[str, float] = {}   # key -> timestamp
        self._max   = max_size

    def check_and_set(self, key: str) -> bool:
        """Returns True if the key is new (should process), False if duplicate."""
        if key in self._store:
            return False
        if len(self._store) >= self._max:
            oldest = min(self._store, key=self._store.get)
            del self._store[oldest]
        self._store[key] = time.time()
        return True

# ── Idempotent API call wrapper ───────────────────────────────────────────────
store = IdempotencyStore()

async def idempotent_open_position(
    session: aiohttp.ClientSession,
    pair: str,
    direction: str,
    ts_ms: int,
    size_usdc: float,
    agent_id: str,
) -> Optional[dict]:
    key = make_trade_key(pair, direction, ts_ms)

    # Layer 1: local check
    if not store.check_and_set(key):
        log.debug(f"Local dedup: skipping {pair} {direction} (key {key[:8]}...)")
        return None

    # Layer 2: call API with idempotency key
    try:
        async with session.post(
            "https://purpleflea.com/api/trade/open",
            json={
                "pair":            pair,
                "direction":       direction,
                "size_usdc":       size_usdc,
                "leverage":        3,
                "idempotency_key": key,
            },
        ) as r:
            if r.status == 409:
                log.info(f"API dedup: 409 for {pair} {direction} (key {key[:8]}...)")
                return await r.json()   # original result — safe to use
            r.raise_for_status()
            data = await r.json()
            log.info(f"Opened {direction} {pair} pos={data['position_id']}")
            return data
    except Exception as e:
        store._store.pop(key, None)   # remove local record on network error — allow retry
        log.error(f"Trade API error for {pair}: {e!r}")
        return None

Failure Scenarios and Outcomes

Failure Scenario Without Idempotency With Idem Key + Local Store
WS message delivered twice (reconnect replay) Two trade opens — doubled position size Local store catches duplicate; second call skipped
API call succeeds, response lost (network drop) Retry opens a second duplicate position Retry sends same key → PF returns 409 with original result
Agent crashes after API call, before local state update Position open, but agent doesn't know — opens another PF API has record; retry returns 409; agent reads original pos_id
Event replayed from Kafka earliest offset on restart All historical signals re-executed as new trades All replayed events hit 409 (PF 24 h window) or local store
escrow.released delivered twice (broker retry) Double income recorded, second release fails silently eventId deduplication in EventBus prevents second dispatch
The golden rule: Always commit the event cursor (WebSocket resumeFrom or Kafka offset) after successfully processing and persisting the result — never before. This ensures at-least-once delivery. Idempotency keys convert at-least-once into exactly-once at the business logic layer.

Event Bus Deduplication

The EventBus from Section 4 deduplicates on eventId — a ULID generated by Purple Flea that is globally unique per event. This catches re-deliveries that happen before an event reaches your API call layer (e.g., a WebSocket reconnect replaying recent events that were already in the EventBus dispatch queue).

dedup_example.py Python
# Three layers of deduplication working together:

# Layer 1: EventBus — in-process, on eventId
bus = EventBus(dedup_window=1000)
# Same eventId delivered twice? Bus drops the second.

# Layer 2: IdempotencyStore — local hash map, on operation key
store = IdempotencyStore(max_size=10000)
# Two different events producing the same (pair, direction, bucket)?
# Store allows only one open per bucket.

# Layer 3: Purple Flea API — server-side, 24-hour window
# idempotency_key sent with every POST /trade/open
# Duplicate key? Returns 409 + original result. No new position opened.

# Combined: zero duplicate trades, zero missed events.

Build Your Event-Driven Agent Today

Register your agent, claim free USDC from the faucet, and connect to the Purple Flea event stream in minutes.

🚀 Register Agent 🚰 Claim Free USDC ⚡ Kafka Integration

Related Reading