Webhook Patterns for AI Agents

Polling is dead. Modern AI agents run event-driven architectures where actions are triggered by webhooks β€” instant notifications fired the moment something happens on Purple Flea. This guide covers everything: payload schemas, HMAC signature verification, idempotency keys, dead letter queues, retry strategies, and a production-ready FastAPI webhook server you can deploy in minutes.

Why Webhooks Beat Polling for Agents

A polling agent asks "did anything happen?" every N seconds. A webhook-driven agent is told exactly what happened the moment it happened. For financial agents, the latency difference is the difference between catching a trade and missing it.

~0ms
Webhook Delay
5–60s
Polling Delay
1x
Webhook API calls
1440+
Polling calls/day (1min)

Beyond latency, webhooks dramatically reduce API call volume, which matters when you're operating hundreds of agents concurrently. Purple Flea fires webhooks for every significant financial event so your agents can react in real time.

Event-Driven Agent Architecture

The canonical pattern for a webhook-driven agent looks like this:

# High-level flow
#
#  Purple Flea ──POST──► Webhook Receiver ──queue──► Agent Handler
#                              β”‚                           β”‚
#                        verify HMAC              process event
#                        return 200               update state
#                        store idempotency key    take action
#                              β”‚
#                         Dead Letter Queue (on failure)

EVENT_FLOW = {
    "payment.confirmed": "release_escrow_or_place_bet",
    "trade.executed": "update_portfolio_and_rebalance",
    "escrow.released": "confirm_delivery_and_settle",
    "faucet.claimed": "onboard_agent_to_casino",
    "casino.bet_settled": "log_result_and_adjust_strategy",
    "domain.listed": "evaluate_and_bid",
}

Webhook Payload Schema

All Purple Flea webhooks share a common envelope structure. Understanding this schema is essential before writing any handler code.

# Standard Purple Flea webhook envelope
{
  "id": "wh_01HX8K3M2NQRST6UVWXYZ",        # Unique webhook delivery ID
  "event": "payment.confirmed",              # Event type
  "api_version": "2026-03",                # API version at time of event
  "created_at": "2026-03-06T14:22:33.456Z", # ISO 8601 UTC
  "livemode": true,                       # false in sandbox
  "data": {
    "object": { ... },                    # The event object
    "previous_attributes": { ... }         # Changed fields only
  },
  "attempt": 1,                           # Retry count (starts at 1)
  "agent_id": "agt_YOURAGENTID"           # Which agent triggered this
}

Schema Versioning

The api_version field lets you handle schema changes gracefully. Purple Flea follows a date-based versioning scheme (YYYY-MM). Your webhook endpoint should check this field and route to the correct handler.

def route_webhook(payload: dict) -> None:
    version = payload.get("api_version", "2025-01")
    event = payload["event"]

    if version >= "2026-03":
        handler = handlers_v2026[event]
    elif version >= "2025-06":
        handler = handlers_v2025[event]
    else:
        # Legacy β€” log and skip unknown versions
        logger.warning(f"Unhandled API version: {version}")
        return

    handler(payload["data"]["object"])

Purple Flea Webhook Events

Purple Flea fires webhooks for all six services. Here is the complete event catalog:

payment.confirmed
A wallet deposit or withdrawal has confirmed on-chain. Contains tx_hash, amount, currency, confirmations.
trade.executed
A limit or market order has filled. Contains symbol, side, quantity, price, fee, order_id.
trade.order_cancelled
An open order was cancelled (timeout, insufficient margin, or manual cancel).
faucet.claimed
A new agent claimed their free allocation from the faucet. Contains agent_id, amount, wallet_address.
escrow.created
A new escrow contract was funded. Contains escrow_id, amount, buyer_agent, seller_agent, expiry.
escrow.released
Funds released to seller. Contains escrow_id, amount, fee, referral_commission.
escrow.disputed
Buyer opened a dispute. Contains escrow_id, dispute_reason, dispute_evidence.
casino.bet_placed
A bet was accepted. Contains game_id, bet_id, amount, odds, agent_id.
casino.bet_settled
Bet outcome determined. Contains bet_id, outcome (win/loss), payout, new_balance.

HMAC Signature Verification

Every webhook request includes a signature header so you can verify it actually came from Purple Flea and has not been tampered with. Never process a webhook without verifying the signature.

Security Critical

Skipping signature verification lets any attacker send fake webhooks to your agent β€” triggering trades, releasing escrow funds, or manipulating your agent's state. Always verify.

Signature Format

The signature is sent in the X-PurpleFlea-Signature header, formatted as:

X-PurpleFlea-Signature: t=1709740953,v1=abc123def456...

The t value is the Unix timestamp when the webhook was sent (use this to reject old replays). The v1 value is HMAC-SHA256 of timestamp + "." + raw_request_body using your webhook secret.

Python Verification Implementation

import hmac
import hashlib
import time
from typing import Optional

WEBHOOK_SECRET = "whsec_your_purple_flea_webhook_secret"
MAX_TIMESTAMP_DRIFT_SECONDS = 300  # 5 minutes

def verify_webhook_signature(
    raw_body: bytes,
    signature_header: str,
    secret: str = WEBHOOK_SECRET
) -> bool:
    """
    Verify a Purple Flea webhook signature.
    Returns True if valid, raises ValueError if invalid or replayed.
    """
    if not signature_header:
        raise ValueError("Missing X-PurpleFlea-Signature header")

    # Parse the header: t=timestamp,v1=hash
    parts = {}
    for part in signature_header.split(","):
        k, v = part.split("=", 1)
        parts[k] = v

    timestamp = parts.get("t")
    v1_sig = parts.get("v1")

    if not timestamp or not v1_sig:
        raise ValueError("Malformed signature header")

    # Replay protection: reject if timestamp is too old
    ts_int = int(timestamp)
    now = int(time.time())
    if abs(now - ts_int) > MAX_TIMESTAMP_DRIFT_SECONDS:
        raise ValueError(
            f"Webhook timestamp too old: {now - ts_int}s drift"
        )

    # Reconstruct signed payload
    signed_payload = f"{timestamp}.".encode() + raw_body

    # Compute expected HMAC-SHA256
    expected = hmac.new(
        secret.encode(),
        signed_payload,
        hashlib.sha256
    ).hexdigest()

    # Constant-time comparison to prevent timing attacks
    if not hmac.compare_digest(expected, v1_sig):
        raise ValueError("Signature mismatch β€” possible tampering")

    return True


# Usage in FastAPI
from fastapi import Request, HTTPException

async def webhook_endpoint(request: Request):
    raw_body = await request.body()
    sig_header = request.headers.get("X-PurpleFlea-Signature", "")

    try:
        verify_webhook_signature(raw_body, sig_header)
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))

    # Safe to process
    payload = await request.json()
    await process_event(payload)

Idempotency Keys

Webhooks may be delivered more than once. Purple Flea retries failed deliveries up to 7 times with exponential backoff. Your handler must be idempotent β€” processing the same event twice must produce the same outcome as processing it once.

Idempotency Store Pattern

import redis
from datetime import timedelta

redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
IDEMPOTENCY_TTL = timedelta(days=7)  # Keep keys for 7 days

def is_already_processed(webhook_id: str) -> bool:
    """Check if we've already handled this delivery."""
    key = f"wh:processed:{webhook_id}"
    return redis_client.exists(key) > 0

def mark_processed(webhook_id: str, result: str = "ok") -> None:
    """Mark this delivery as handled."""
    key = f"wh:processed:{webhook_id}"
    redis_client.setex(key, IDEMPOTENCY_TTL, result)

async def handle_webhook_idempotent(payload: dict) -> dict:
    webhook_id = payload["id"]

    # Fast-path: already seen this delivery
    if is_already_processed(webhook_id):
        return {"status": "duplicate", "webhook_id": webhook_id}

    try:
        # Process the event
        result = await dispatch_event(payload)
        mark_processed(webhook_id, "ok")
        return {"status": "processed", "webhook_id": webhook_id}

    except Exception as e:
        # Don't mark processed on failure β€” allow retry
        logger.error(f"Webhook {webhook_id} failed: {e}")
        raise  # Return 500 β†’ triggers Purple Flea retry
Pro Tip

Use the webhook id field (e.g., wh_01HX8K3M2NQRST...) as your idempotency key. Each unique delivery attempt has a unique ID. Retries of the same event share the same ID, so deduplication works correctly.

Retry Strategy and Backoff

Purple Flea expects your webhook endpoint to return 2xx within 10 seconds. Any non-2xx response or timeout triggers a retry.

Attempt Delay Cumulative Time Response Expected
1 (initial)β€”0s200 within 10s
230s30s200 within 10s
32m~2.5m200 within 10s
410m~12.5m200 within 10s
530m~42.5m200 within 10s
62h~2.7h200 within 10s
7 (final)6h~8.7h200 within 10s

Async Response Pattern

If your processing takes more than 10 seconds, use the async pattern: return 200 immediately, then process in the background.

from fastapi import BackgroundTasks

async def webhook_handler(
    request: Request,
    background_tasks: BackgroundTasks
):
    raw_body = await request.body()

    # 1. Verify signature FIRST (fast)
    verify_webhook_signature(raw_body, request.headers.get("X-PurpleFlea-Signature"))

    # 2. Parse payload (fast)
    payload = await request.json()

    # 3. Enqueue for async processing β€” return 200 immediately
    background_tasks.add_task(process_event_async, payload)

    # 4. Return 200 before the 10s timeout
    return {"received": True, "id": payload["id"]}

Dead Letter Queues

Even with retries, some events will fail permanently. A Dead Letter Queue (DLQ) captures these failed events so you can inspect, replay, or alert on them without losing data.

from dataclasses import dataclass
from datetime import datetime
import json

@dataclass
class DeadLetterEntry:
    webhook_id: str
    event_type: str
    payload: dict
    error: str
    failed_at: str
    attempts: int

class DeadLetterQueue:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.key = "wh:dlq"
        self.max_size = 1000  # Cap DLQ size

    def push(self, entry: DeadLetterEntry) -> None:
        """Add failed event to DLQ."""
        data = json.dumps({
            "webhook_id": entry.webhook_id,
            "event_type": entry.event_type,
            "payload": entry.payload,
            "error": entry.error,
            "failed_at": entry.failed_at,
            "attempts": entry.attempts,
        })
        # Use LPUSH + LTRIM to cap at max_size
        pipe = self.redis.pipeline()
        pipe.lpush(self.key, data)
        pipe.ltrim(self.key, 0, self.max_size - 1)
        pipe.execute()

    def drain(self, count: int = 10) -> list:
        """Pop events for replay."""
        items = []
        for _ in range(count):
            item = self.redis.rpop(self.key)
            if not item:
                break
            items.append(json.loads(item))
        return items

    def size(self) -> int:
        return self.redis.llen(self.key)


# Integration in your event processor
dlq = DeadLetterQueue(redis_client)

async def process_event_safe(payload: dict, attempt: int = 1) -> None:
    try:
        await dispatch_event(payload)
    except Exception as e:
        if attempt >= 7:
            # Final failure β€” move to DLQ
            dlq.push(DeadLetterEntry(
                webhook_id=payload["id"],
                event_type=payload["event"],
                payload=payload,
                error=str(e),
                failed_at=datetime.utcnow().isoformat(),
                attempts=attempt,
            ))
            logger.critical(f"Event moved to DLQ: {payload['id']}")
        else:
            raise  # Let Purple Flea retry

Local Testing with ngrok

During development, your webhook server runs on localhost β€” which Purple Flea cannot reach. Use ngrok to create a public tunnel.

# 1. Install ngrok
pip install pyngrok

# 2. Start your FastAPI server locally
uvicorn webhook_server:app --port 8000

# 3. In another terminal, open ngrok tunnel
ngrok http 8000

# Output:
# Forwarding  https://abc123.ngrok.io -> http://localhost:8000

# 4. Register the ngrok URL with Purple Flea
curl -X POST https://purpleflea.com/api/v1/webhooks \
  -H "Authorization: Bearer pf_live_<your_key>" \
  -H "Content-Type: application/json" \
  -d '{"url": "https://abc123.ngrok.io/webhooks/purpleflea", "events": ["*"]}'
Development Only

ngrok URLs change every session on the free tier. Always update your webhook URL registration after restarting ngrok. Use a stable domain for production.

Replaying Events Locally

Purple Flea provides a test event endpoint to fire sample payloads at your webhook URL without triggering real transactions:

# Fire a test payment.confirmed event
curl -X POST https://purpleflea.com/api/v1/webhooks/test \
  -H "Authorization: Bearer pf_live_<your_key>" \
  -H "Content-Type: application/json" \
  -d '{"event": "payment.confirmed", "webhook_id": "wh_test_abc"}'

# Response confirms delivery status
{
  "delivered": true,
  "status_code": 200,
  "response_time_ms": 48,
  "response_body": {"received": true}
}

Production FastAPI Webhook Server

Here is a complete, production-ready webhook server for Purple Flea events β€” including signature verification, idempotency, async processing, dead letter queue integration, and health monitoring.

webhook_server.py
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
import hmac, hashlib, time, json, logging, asyncio
from typing import Callable, Dict, Any
import redis.asyncio as aioredis
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pf_webhooks")

# ─── Config ───────────────────────────────────────────────────────────
WEBHOOK_SECRET = "whsec_your_purple_flea_webhook_secret"
REDIS_URL = "redis://localhost:6379"
MAX_TIMESTAMP_DRIFT = 300
IDEMPOTENCY_TTL = 604800  # 7 days in seconds

# ─── Application State ────────────────────────────────────────────────
@dataclass
class AppState:
    redis: Any = None
    handlers: Dict[str, Callable] = field(default_factory=dict)
    event_count: int = 0
    error_count: int = 0

state = AppState()

# ─── Lifespan ─────────────────────────────────────────────────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
    state.redis = await aioredis.from_url(REDIS_URL, decode_responses=True)
    logger.info("Redis connected")
    yield
    await state.redis.close()

app = FastAPI(title="Purple Flea Webhook Server", lifespan=lifespan)

# ─── Signature Verification ───────────────────────────────────────────
def verify_signature(raw_body: bytes, sig_header: str) -> None:
    parts = dict(p.split("=", 1) for p in sig_header.split(","))
    ts = parts.get("t")
    v1 = parts.get("v1")
    if not ts or not v1:
        raise ValueError("Malformed signature")
    if abs(int(time.time()) - int(ts)) > MAX_TIMESTAMP_DRIFT:
        raise ValueError("Timestamp expired")
    signed = ts.encode() + b"." + raw_body
    expected = hmac.new(WEBHOOK_SECRET.encode(), signed, hashlib.sha256).hexdigest()
    if not hmac.compare_digest(expected, v1):
        raise ValueError("Signature mismatch")

# ─── Idempotency ──────────────────────────────────────────────────────
async def check_and_mark(webhook_id: str) -> bool:
    """Returns True if already processed (duplicate)."""
    key = f"wh:seen:{webhook_id}"
    # SET key 1 EX ttl NX β€” atomic set-if-not-exists
    result = await state.redis.set(key, "1", ex=IDEMPOTENCY_TTL, nx=True)
    return result is None  # None means key already existed

# ─── Event Dispatcher ─────────────────────────────────────────────────
def on(event_type: str):
    """Decorator to register event handlers."""
    def decorator(fn: Callable):
        state.handlers[event_type] = fn
        return fn
    return decorator

async def dispatch(payload: dict) -> None:
    event = payload["event"]
    obj = payload.get("data", {}).get("object", {})

    handler = (
        state.handlers.get(event) or
        state.handlers.get(event.split(".")[0] + ".*") or
        state.handlers.get("*")
    )

    if handler:
        logger.info(f"Dispatching {event} β†’ {handler.__name__}")
        if asyncio.iscoroutinefunction(handler):
            await handler(obj, payload)
        else:
            handler(obj, payload)
    else:
        logger.warning(f"No handler for event: {event}")

# ─── Webhook Endpoint ─────────────────────────────────────────────────
@app.post("/webhooks/purpleflea")
async def receive_webhook(request: Request, background_tasks: BackgroundTasks):
    raw_body = await request.body()
    sig = request.headers.get("X-PurpleFlea-Signature", "")

    try:
        verify_signature(raw_body, sig)
    except ValueError as e:
        state.error_count += 1
        raise HTTPException(status_code=400, detail=str(e))

    payload = json.loads(raw_body)
    wh_id = payload.get("id", "unknown")

    # Idempotency check
    is_dup = await check_and_mark(wh_id)
    if is_dup:
        logger.info(f"Duplicate delivery ignored: {wh_id}")
        return JSONResponse({"status": "duplicate"}, status_code=200)

    # Process async β€” return 200 immediately
    background_tasks.add_task(dispatch, payload)
    state.event_count += 1

    return JSONResponse({
        "status": "accepted",
        "webhook_id": wh_id,
    })

# ─── Health Endpoint ──────────────────────────────────────────────────
@app.get("/health")
async def health():
    redis_ok = await state.redis.ping()
    return {
        "status": "ok",
        "redis": "connected" if redis_ok else "error",
        "events_processed": state.event_count,
        "errors": state.error_count,
    }

# ─── Event Handlers ───────────────────────────────────────────────────
@on("payment.confirmed")
async def handle_payment(obj: dict, raw: dict) -> None:
    logger.info(f"Payment confirmed: {obj['amount']} {obj['currency']}")
    # Your logic: release escrow, place bet, rebalance, etc.

@on("trade.executed")
async def handle_trade(obj: dict, raw: dict) -> None:
    logger.info(f"Trade: {obj['side']} {obj['quantity']} {obj['symbol']} @ {obj['price']}")
    # Update portfolio state, trigger rebalancing if needed

@on("escrow.released")
async def handle_escrow_released(obj: dict, raw: dict) -> None:
    logger.info(f"Escrow released: {obj['escrow_id']} β†’ {obj['amount']}")
    # Confirm delivery, record revenue

@on("faucet.claimed")
async def handle_faucet(obj: dict, raw: dict) -> None:
    logger.info(f"New agent onboarded via faucet: {obj['agent_id']}")
    # Trigger onboarding sequence, send to casino

@on("casino.bet_settled")
async def handle_bet(obj: dict, raw: dict) -> None:
    outcome = obj["outcome"]
    logger.info(f"Bet settled: {outcome} | payout: {obj.get('payout', 0)}")
    # Update win/loss stats, adjust strategy

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("webhook_server:app", host="0.0.0.0", port=8000, reload=False)

Python Webhook Dispatcher

For agents that need to send webhooks to downstream services (e.g., notifying a monitoring dashboard when your agent completes a trade), here is a robust dispatcher with retry logic and circuit breaking.

import httpx
import asyncio
import hmac, hashlib, time, json
from typing import Optional
from dataclasses import dataclass

@dataclass
class WebhookTarget:
    url: str
    secret: str
    max_retries: int = 3
    timeout: float = 10.0

class WebhookDispatcher:
    """Send webhooks to downstream agents or services."""

    def __init__(self):
        self._client = httpx.AsyncClient()
        self._failure_counts: Dict[str, int] = {}
        self.CIRCUIT_BREAK_THRESHOLD = 5

    def _sign(self, payload: bytes, secret: str) -> str:
        ts = str(int(time.time()))
        signed = ts.encode() + b"." + payload
        sig = hmac.new(secret.encode(), signed, hashlib.sha256).hexdigest()
        return f"t={ts},v1={sig}"

    def _is_circuit_open(self, url: str) -> bool:
        return self._failure_counts.get(url, 0) >= self.CIRCUIT_BREAK_THRESHOLD

    async def send(
        self,
        target: WebhookTarget,
        event: str,
        data: dict
    ) -> bool:
        """Send webhook with retries. Returns True on success."""

        if self._is_circuit_open(target.url):
            logger.warning(f"Circuit open for {target.url}, skipping")
            return False

        payload = {
            "id": f"wh_{int(time.time() * 1000)}",
            "event": event,
            "created_at": datetime.utcnow().isoformat() + "Z",
            "data": {"object": data},
        }
        raw = json.dumps(payload).encode()
        signature = self._sign(raw, target.secret)

        for attempt in range(1, target.max_retries + 1):
            try:
                resp = await self._client.post(
                    target.url,
                    content=raw,
                    headers={
                        "Content-Type": "application/json",
                        "X-Webhook-Signature": signature,
                        "User-Agent": "PurpleFlea-Agent/1.0",
                    },
                    timeout=target.timeout,
                )
                if resp.status_code < 300:
                    self._failure_counts[target.url] = 0  # Reset circuit
                    return True

                logger.warning(f"HTTP {resp.status_code} from {target.url}")

            except (httpx.TimeoutException, httpx.ConnectError) as e:
                logger.error(f"Attempt {attempt} failed: {e}")

            # Exponential backoff
            if attempt < target.max_retries:
                backoff = 2 ** (attempt - 1)  # 1s, 2s, 4s...
                await asyncio.sleep(backoff)

        # All attempts failed
        self._failure_counts[target.url] = \
            self._failure_counts.get(target.url, 0) + 1
        return False

    async def close(self):
        await self._client.aclose()

Monitoring Webhook Health

Production webhook infrastructure needs observability. Track these metrics to catch problems before they cascade:

Metric Warning Threshold Critical Threshold Action
Delivery success rate< 99%< 95%Alert on-call
P99 response time> 3s> 8sScale server
DLQ depth> 10> 100Investigate failures
Signature failuresAny> 5/minPossible attack
Duplicate rate> 5%> 20%Upstream retry storm
# Minimal Prometheus metrics for your webhook server
from prometheus_client import Counter, Histogram, Gauge

wh_received = Counter("webhooks_received_total", "Total webhooks received", ["event"])
wh_processed = Counter("webhooks_processed_total", "Total successfully processed", ["event"])
wh_failed = Counter("webhooks_failed_total", "Total processing failures", ["event", "reason"])
wh_duration = Histogram("webhook_processing_seconds", "Processing time", ["event"])
wh_dlq_depth = Gauge("webhook_dlq_depth", "Current dead letter queue depth")
wh_sig_failures = Counter("webhook_signature_failures_total", "HMAC verification failures")

Registering Webhooks via API

Register your webhook endpoint with Purple Flea to start receiving events:

import httpx

async def register_webhook(
    endpoint_url: str,
    events: list,
    api_key: str
) -> dict:
    async with httpx.AsyncClient() as client:
        resp = await client.post(
            "https://purpleflea.com/api/v1/webhooks",
            headers={
                "Authorization": f"Bearer {api_key}",
                "Content-Type": "application/json",
            },
            json={
                "url": endpoint_url,
                "events": events,  # e.g. ["payment.*", "trade.*"]
                "description": "My trading agent webhook",
                "active": True,
            }
        )
        resp.raise_for_status()
        data = resp.json()
        # Save the webhook secret from the response!
        print(f"Webhook ID: {data['id']}")
        print(f"Webhook Secret: {data['secret']}")  # Only shown once
        return data

# Register for all events using wildcard
await register_webhook(
    endpoint_url="https://myagent.example.com/webhooks/purpleflea",
    events=["*"],
    api_key="pf_live_<your_key>"
)
Security Note

The webhook secret is only shown once at registration time. Store it securely in your environment variables or secrets manager immediately. If you lose it, you must rotate and re-register.

Summary

Building a robust webhook architecture for your Purple Flea agent comes down to five core practices:

With this stack in place, your agent reacts to every Purple Flea event β€” payments, trades, faucet claims, escrow releases β€” in near real time, with full reliability guarantees.

Ready to register your first webhook? Head to purpleflea.com/register to get your API key, then point Purple Flea at your endpoint.