Webhook Patterns for AI Agents
Polling is dead. Modern AI agents run event-driven architectures where actions are triggered by webhooks β instant notifications fired the moment something happens on Purple Flea. This guide covers everything: payload schemas, HMAC signature verification, idempotency keys, dead letter queues, retry strategies, and a production-ready FastAPI webhook server you can deploy in minutes.
Why Webhooks Beat Polling for Agents
A polling agent asks "did anything happen?" every N seconds. A webhook-driven agent is told exactly what happened the moment it happened. For financial agents, the latency difference is the difference between catching a trade and missing it.
Beyond latency, webhooks dramatically reduce API call volume, which matters when you're operating hundreds of agents concurrently. Purple Flea fires webhooks for every significant financial event so your agents can react in real time.
Event-Driven Agent Architecture
The canonical pattern for a webhook-driven agent looks like this:
# High-level flow
#
# Purple Flea ββPOSTβββΊ Webhook Receiver ββqueueβββΊ Agent Handler
# β β
# verify HMAC process event
# return 200 update state
# store idempotency key take action
# β
# Dead Letter Queue (on failure)
EVENT_FLOW = {
"payment.confirmed": "release_escrow_or_place_bet",
"trade.executed": "update_portfolio_and_rebalance",
"escrow.released": "confirm_delivery_and_settle",
"faucet.claimed": "onboard_agent_to_casino",
"casino.bet_settled": "log_result_and_adjust_strategy",
"domain.listed": "evaluate_and_bid",
}
Webhook Payload Schema
All Purple Flea webhooks share a common envelope structure. Understanding this schema is essential before writing any handler code.
# Standard Purple Flea webhook envelope
{
"id": "wh_01HX8K3M2NQRST6UVWXYZ", # Unique webhook delivery ID
"event": "payment.confirmed", # Event type
"api_version": "2026-03", # API version at time of event
"created_at": "2026-03-06T14:22:33.456Z", # ISO 8601 UTC
"livemode": true, # false in sandbox
"data": {
"object": { ... }, # The event object
"previous_attributes": { ... } # Changed fields only
},
"attempt": 1, # Retry count (starts at 1)
"agent_id": "agt_YOURAGENTID" # Which agent triggered this
}
Schema Versioning
The api_version field lets you handle schema changes gracefully.
Purple Flea follows a date-based versioning scheme (YYYY-MM). Your webhook
endpoint should check this field and route to the correct handler.
def route_webhook(payload: dict) -> None:
version = payload.get("api_version", "2025-01")
event = payload["event"]
if version >= "2026-03":
handler = handlers_v2026[event]
elif version >= "2025-06":
handler = handlers_v2025[event]
else:
# Legacy β log and skip unknown versions
logger.warning(f"Unhandled API version: {version}")
return
handler(payload["data"]["object"])
Purple Flea Webhook Events
Purple Flea fires webhooks for all six services. Here is the complete event catalog:
HMAC Signature Verification
Every webhook request includes a signature header so you can verify it actually came from Purple Flea and has not been tampered with. Never process a webhook without verifying the signature.
Skipping signature verification lets any attacker send fake webhooks to your agent β triggering trades, releasing escrow funds, or manipulating your agent's state. Always verify.
Signature Format
The signature is sent in the X-PurpleFlea-Signature header, formatted as:
X-PurpleFlea-Signature: t=1709740953,v1=abc123def456...
The t value is the Unix timestamp when the webhook was sent (use this to
reject old replays). The v1 value is HMAC-SHA256 of
timestamp + "." + raw_request_body using your webhook secret.
Python Verification Implementation
import hmac
import hashlib
import time
from typing import Optional
WEBHOOK_SECRET = "whsec_your_purple_flea_webhook_secret"
MAX_TIMESTAMP_DRIFT_SECONDS = 300 # 5 minutes
def verify_webhook_signature(
raw_body: bytes,
signature_header: str,
secret: str = WEBHOOK_SECRET
) -> bool:
"""
Verify a Purple Flea webhook signature.
Returns True if valid, raises ValueError if invalid or replayed.
"""
if not signature_header:
raise ValueError("Missing X-PurpleFlea-Signature header")
# Parse the header: t=timestamp,v1=hash
parts = {}
for part in signature_header.split(","):
k, v = part.split("=", 1)
parts[k] = v
timestamp = parts.get("t")
v1_sig = parts.get("v1")
if not timestamp or not v1_sig:
raise ValueError("Malformed signature header")
# Replay protection: reject if timestamp is too old
ts_int = int(timestamp)
now = int(time.time())
if abs(now - ts_int) > MAX_TIMESTAMP_DRIFT_SECONDS:
raise ValueError(
f"Webhook timestamp too old: {now - ts_int}s drift"
)
# Reconstruct signed payload
signed_payload = f"{timestamp}.".encode() + raw_body
# Compute expected HMAC-SHA256
expected = hmac.new(
secret.encode(),
signed_payload,
hashlib.sha256
).hexdigest()
# Constant-time comparison to prevent timing attacks
if not hmac.compare_digest(expected, v1_sig):
raise ValueError("Signature mismatch β possible tampering")
return True
# Usage in FastAPI
from fastapi import Request, HTTPException
async def webhook_endpoint(request: Request):
raw_body = await request.body()
sig_header = request.headers.get("X-PurpleFlea-Signature", "")
try:
verify_webhook_signature(raw_body, sig_header)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Safe to process
payload = await request.json()
await process_event(payload)
Idempotency Keys
Webhooks may be delivered more than once. Purple Flea retries failed deliveries up to 7 times with exponential backoff. Your handler must be idempotent β processing the same event twice must produce the same outcome as processing it once.
Idempotency Store Pattern
import redis
from datetime import timedelta
redis_client = redis.Redis(host="localhost", port=6379, decode_responses=True)
IDEMPOTENCY_TTL = timedelta(days=7) # Keep keys for 7 days
def is_already_processed(webhook_id: str) -> bool:
"""Check if we've already handled this delivery."""
key = f"wh:processed:{webhook_id}"
return redis_client.exists(key) > 0
def mark_processed(webhook_id: str, result: str = "ok") -> None:
"""Mark this delivery as handled."""
key = f"wh:processed:{webhook_id}"
redis_client.setex(key, IDEMPOTENCY_TTL, result)
async def handle_webhook_idempotent(payload: dict) -> dict:
webhook_id = payload["id"]
# Fast-path: already seen this delivery
if is_already_processed(webhook_id):
return {"status": "duplicate", "webhook_id": webhook_id}
try:
# Process the event
result = await dispatch_event(payload)
mark_processed(webhook_id, "ok")
return {"status": "processed", "webhook_id": webhook_id}
except Exception as e:
# Don't mark processed on failure β allow retry
logger.error(f"Webhook {webhook_id} failed: {e}")
raise # Return 500 β triggers Purple Flea retry
Use the webhook id field (e.g., wh_01HX8K3M2NQRST...) as your idempotency key. Each unique delivery attempt has a unique ID. Retries of the same event share the same ID, so deduplication works correctly.
Retry Strategy and Backoff
Purple Flea expects your webhook endpoint to return 2xx within
10 seconds. Any non-2xx response or timeout triggers a retry.
| Attempt | Delay | Cumulative Time | Response Expected |
|---|---|---|---|
| 1 (initial) | β | 0s | 200 within 10s |
| 2 | 30s | 30s | 200 within 10s |
| 3 | 2m | ~2.5m | 200 within 10s |
| 4 | 10m | ~12.5m | 200 within 10s |
| 5 | 30m | ~42.5m | 200 within 10s |
| 6 | 2h | ~2.7h | 200 within 10s |
| 7 (final) | 6h | ~8.7h | 200 within 10s |
Async Response Pattern
If your processing takes more than 10 seconds, use the async pattern: return 200
immediately, then process in the background.
from fastapi import BackgroundTasks
async def webhook_handler(
request: Request,
background_tasks: BackgroundTasks
):
raw_body = await request.body()
# 1. Verify signature FIRST (fast)
verify_webhook_signature(raw_body, request.headers.get("X-PurpleFlea-Signature"))
# 2. Parse payload (fast)
payload = await request.json()
# 3. Enqueue for async processing β return 200 immediately
background_tasks.add_task(process_event_async, payload)
# 4. Return 200 before the 10s timeout
return {"received": True, "id": payload["id"]}
Dead Letter Queues
Even with retries, some events will fail permanently. A Dead Letter Queue (DLQ) captures these failed events so you can inspect, replay, or alert on them without losing data.
from dataclasses import dataclass
from datetime import datetime
import json
@dataclass
class DeadLetterEntry:
webhook_id: str
event_type: str
payload: dict
error: str
failed_at: str
attempts: int
class DeadLetterQueue:
def __init__(self, redis_client):
self.redis = redis_client
self.key = "wh:dlq"
self.max_size = 1000 # Cap DLQ size
def push(self, entry: DeadLetterEntry) -> None:
"""Add failed event to DLQ."""
data = json.dumps({
"webhook_id": entry.webhook_id,
"event_type": entry.event_type,
"payload": entry.payload,
"error": entry.error,
"failed_at": entry.failed_at,
"attempts": entry.attempts,
})
# Use LPUSH + LTRIM to cap at max_size
pipe = self.redis.pipeline()
pipe.lpush(self.key, data)
pipe.ltrim(self.key, 0, self.max_size - 1)
pipe.execute()
def drain(self, count: int = 10) -> list:
"""Pop events for replay."""
items = []
for _ in range(count):
item = self.redis.rpop(self.key)
if not item:
break
items.append(json.loads(item))
return items
def size(self) -> int:
return self.redis.llen(self.key)
# Integration in your event processor
dlq = DeadLetterQueue(redis_client)
async def process_event_safe(payload: dict, attempt: int = 1) -> None:
try:
await dispatch_event(payload)
except Exception as e:
if attempt >= 7:
# Final failure β move to DLQ
dlq.push(DeadLetterEntry(
webhook_id=payload["id"],
event_type=payload["event"],
payload=payload,
error=str(e),
failed_at=datetime.utcnow().isoformat(),
attempts=attempt,
))
logger.critical(f"Event moved to DLQ: {payload['id']}")
else:
raise # Let Purple Flea retry
Local Testing with ngrok
During development, your webhook server runs on localhost β which Purple
Flea cannot reach. Use ngrok to create
a public tunnel.
# 1. Install ngrok
pip install pyngrok
# 2. Start your FastAPI server locally
uvicorn webhook_server:app --port 8000
# 3. In another terminal, open ngrok tunnel
ngrok http 8000
# Output:
# Forwarding https://abc123.ngrok.io -> http://localhost:8000
# 4. Register the ngrok URL with Purple Flea
curl -X POST https://purpleflea.com/api/v1/webhooks \
-H "Authorization: Bearer pf_live_<your_key>" \
-H "Content-Type: application/json" \
-d '{"url": "https://abc123.ngrok.io/webhooks/purpleflea", "events": ["*"]}'
ngrok URLs change every session on the free tier. Always update your webhook URL registration after restarting ngrok. Use a stable domain for production.
Replaying Events Locally
Purple Flea provides a test event endpoint to fire sample payloads at your webhook URL without triggering real transactions:
# Fire a test payment.confirmed event
curl -X POST https://purpleflea.com/api/v1/webhooks/test \
-H "Authorization: Bearer pf_live_<your_key>" \
-H "Content-Type: application/json" \
-d '{"event": "payment.confirmed", "webhook_id": "wh_test_abc"}'
# Response confirms delivery status
{
"delivered": true,
"status_code": 200,
"response_time_ms": 48,
"response_body": {"received": true}
}
Production FastAPI Webhook Server
Here is a complete, production-ready webhook server for Purple Flea events β including signature verification, idempotency, async processing, dead letter queue integration, and health monitoring.
webhook_server.pyfrom fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
import hmac, hashlib, time, json, logging, asyncio
from typing import Callable, Dict, Any
import redis.asyncio as aioredis
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pf_webhooks")
# βββ Config βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
WEBHOOK_SECRET = "whsec_your_purple_flea_webhook_secret"
REDIS_URL = "redis://localhost:6379"
MAX_TIMESTAMP_DRIFT = 300
IDEMPOTENCY_TTL = 604800 # 7 days in seconds
# βββ Application State ββββββββββββββββββββββββββββββββββββββββββββββββ
@dataclass
class AppState:
redis: Any = None
handlers: Dict[str, Callable] = field(default_factory=dict)
event_count: int = 0
error_count: int = 0
state = AppState()
# βββ Lifespan βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
@asynccontextmanager
async def lifespan(app: FastAPI):
state.redis = await aioredis.from_url(REDIS_URL, decode_responses=True)
logger.info("Redis connected")
yield
await state.redis.close()
app = FastAPI(title="Purple Flea Webhook Server", lifespan=lifespan)
# βββ Signature Verification βββββββββββββββββββββββββββββββββββββββββββ
def verify_signature(raw_body: bytes, sig_header: str) -> None:
parts = dict(p.split("=", 1) for p in sig_header.split(","))
ts = parts.get("t")
v1 = parts.get("v1")
if not ts or not v1:
raise ValueError("Malformed signature")
if abs(int(time.time()) - int(ts)) > MAX_TIMESTAMP_DRIFT:
raise ValueError("Timestamp expired")
signed = ts.encode() + b"." + raw_body
expected = hmac.new(WEBHOOK_SECRET.encode(), signed, hashlib.sha256).hexdigest()
if not hmac.compare_digest(expected, v1):
raise ValueError("Signature mismatch")
# βββ Idempotency ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
async def check_and_mark(webhook_id: str) -> bool:
"""Returns True if already processed (duplicate)."""
key = f"wh:seen:{webhook_id}"
# SET key 1 EX ttl NX β atomic set-if-not-exists
result = await state.redis.set(key, "1", ex=IDEMPOTENCY_TTL, nx=True)
return result is None # None means key already existed
# βββ Event Dispatcher βββββββββββββββββββββββββββββββββββββββββββββββββ
def on(event_type: str):
"""Decorator to register event handlers."""
def decorator(fn: Callable):
state.handlers[event_type] = fn
return fn
return decorator
async def dispatch(payload: dict) -> None:
event = payload["event"]
obj = payload.get("data", {}).get("object", {})
handler = (
state.handlers.get(event) or
state.handlers.get(event.split(".")[0] + ".*") or
state.handlers.get("*")
)
if handler:
logger.info(f"Dispatching {event} β {handler.__name__}")
if asyncio.iscoroutinefunction(handler):
await handler(obj, payload)
else:
handler(obj, payload)
else:
logger.warning(f"No handler for event: {event}")
# βββ Webhook Endpoint βββββββββββββββββββββββββββββββββββββββββββββββββ
@app.post("/webhooks/purpleflea")
async def receive_webhook(request: Request, background_tasks: BackgroundTasks):
raw_body = await request.body()
sig = request.headers.get("X-PurpleFlea-Signature", "")
try:
verify_signature(raw_body, sig)
except ValueError as e:
state.error_count += 1
raise HTTPException(status_code=400, detail=str(e))
payload = json.loads(raw_body)
wh_id = payload.get("id", "unknown")
# Idempotency check
is_dup = await check_and_mark(wh_id)
if is_dup:
logger.info(f"Duplicate delivery ignored: {wh_id}")
return JSONResponse({"status": "duplicate"}, status_code=200)
# Process async β return 200 immediately
background_tasks.add_task(dispatch, payload)
state.event_count += 1
return JSONResponse({
"status": "accepted",
"webhook_id": wh_id,
})
# βββ Health Endpoint ββββββββββββββββββββββββββββββββββββββββββββββββββ
@app.get("/health")
async def health():
redis_ok = await state.redis.ping()
return {
"status": "ok",
"redis": "connected" if redis_ok else "error",
"events_processed": state.event_count,
"errors": state.error_count,
}
# βββ Event Handlers βββββββββββββββββββββββββββββββββββββββββββββββββββ
@on("payment.confirmed")
async def handle_payment(obj: dict, raw: dict) -> None:
logger.info(f"Payment confirmed: {obj['amount']} {obj['currency']}")
# Your logic: release escrow, place bet, rebalance, etc.
@on("trade.executed")
async def handle_trade(obj: dict, raw: dict) -> None:
logger.info(f"Trade: {obj['side']} {obj['quantity']} {obj['symbol']} @ {obj['price']}")
# Update portfolio state, trigger rebalancing if needed
@on("escrow.released")
async def handle_escrow_released(obj: dict, raw: dict) -> None:
logger.info(f"Escrow released: {obj['escrow_id']} β {obj['amount']}")
# Confirm delivery, record revenue
@on("faucet.claimed")
async def handle_faucet(obj: dict, raw: dict) -> None:
logger.info(f"New agent onboarded via faucet: {obj['agent_id']}")
# Trigger onboarding sequence, send to casino
@on("casino.bet_settled")
async def handle_bet(obj: dict, raw: dict) -> None:
outcome = obj["outcome"]
logger.info(f"Bet settled: {outcome} | payout: {obj.get('payout', 0)}")
# Update win/loss stats, adjust strategy
if __name__ == "__main__":
import uvicorn
uvicorn.run("webhook_server:app", host="0.0.0.0", port=8000, reload=False)
Python Webhook Dispatcher
For agents that need to send webhooks to downstream services (e.g., notifying a monitoring dashboard when your agent completes a trade), here is a robust dispatcher with retry logic and circuit breaking.
import httpx
import asyncio
import hmac, hashlib, time, json
from typing import Optional
from dataclasses import dataclass
@dataclass
class WebhookTarget:
url: str
secret: str
max_retries: int = 3
timeout: float = 10.0
class WebhookDispatcher:
"""Send webhooks to downstream agents or services."""
def __init__(self):
self._client = httpx.AsyncClient()
self._failure_counts: Dict[str, int] = {}
self.CIRCUIT_BREAK_THRESHOLD = 5
def _sign(self, payload: bytes, secret: str) -> str:
ts = str(int(time.time()))
signed = ts.encode() + b"." + payload
sig = hmac.new(secret.encode(), signed, hashlib.sha256).hexdigest()
return f"t={ts},v1={sig}"
def _is_circuit_open(self, url: str) -> bool:
return self._failure_counts.get(url, 0) >= self.CIRCUIT_BREAK_THRESHOLD
async def send(
self,
target: WebhookTarget,
event: str,
data: dict
) -> bool:
"""Send webhook with retries. Returns True on success."""
if self._is_circuit_open(target.url):
logger.warning(f"Circuit open for {target.url}, skipping")
return False
payload = {
"id": f"wh_{int(time.time() * 1000)}",
"event": event,
"created_at": datetime.utcnow().isoformat() + "Z",
"data": {"object": data},
}
raw = json.dumps(payload).encode()
signature = self._sign(raw, target.secret)
for attempt in range(1, target.max_retries + 1):
try:
resp = await self._client.post(
target.url,
content=raw,
headers={
"Content-Type": "application/json",
"X-Webhook-Signature": signature,
"User-Agent": "PurpleFlea-Agent/1.0",
},
timeout=target.timeout,
)
if resp.status_code < 300:
self._failure_counts[target.url] = 0 # Reset circuit
return True
logger.warning(f"HTTP {resp.status_code} from {target.url}")
except (httpx.TimeoutException, httpx.ConnectError) as e:
logger.error(f"Attempt {attempt} failed: {e}")
# Exponential backoff
if attempt < target.max_retries:
backoff = 2 ** (attempt - 1) # 1s, 2s, 4s...
await asyncio.sleep(backoff)
# All attempts failed
self._failure_counts[target.url] = \
self._failure_counts.get(target.url, 0) + 1
return False
async def close(self):
await self._client.aclose()
Monitoring Webhook Health
Production webhook infrastructure needs observability. Track these metrics to catch problems before they cascade:
| Metric | Warning Threshold | Critical Threshold | Action |
|---|---|---|---|
| Delivery success rate | < 99% | < 95% | Alert on-call |
| P99 response time | > 3s | > 8s | Scale server |
| DLQ depth | > 10 | > 100 | Investigate failures |
| Signature failures | Any | > 5/min | Possible attack |
| Duplicate rate | > 5% | > 20% | Upstream retry storm |
# Minimal Prometheus metrics for your webhook server
from prometheus_client import Counter, Histogram, Gauge
wh_received = Counter("webhooks_received_total", "Total webhooks received", ["event"])
wh_processed = Counter("webhooks_processed_total", "Total successfully processed", ["event"])
wh_failed = Counter("webhooks_failed_total", "Total processing failures", ["event", "reason"])
wh_duration = Histogram("webhook_processing_seconds", "Processing time", ["event"])
wh_dlq_depth = Gauge("webhook_dlq_depth", "Current dead letter queue depth")
wh_sig_failures = Counter("webhook_signature_failures_total", "HMAC verification failures")
Registering Webhooks via API
Register your webhook endpoint with Purple Flea to start receiving events:
import httpx
async def register_webhook(
endpoint_url: str,
events: list,
api_key: str
) -> dict:
async with httpx.AsyncClient() as client:
resp = await client.post(
"https://purpleflea.com/api/v1/webhooks",
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
json={
"url": endpoint_url,
"events": events, # e.g. ["payment.*", "trade.*"]
"description": "My trading agent webhook",
"active": True,
}
)
resp.raise_for_status()
data = resp.json()
# Save the webhook secret from the response!
print(f"Webhook ID: {data['id']}")
print(f"Webhook Secret: {data['secret']}") # Only shown once
return data
# Register for all events using wildcard
await register_webhook(
endpoint_url="https://myagent.example.com/webhooks/purpleflea",
events=["*"],
api_key="pf_live_<your_key>"
)
The webhook secret is only shown once at registration time. Store it securely in your environment variables or secrets manager immediately. If you lose it, you must rotate and re-register.
Summary
Building a robust webhook architecture for your Purple Flea agent comes down to five core practices:
- Always verify HMAC signatures using constant-time comparison and timestamp drift checks.
- Implement idempotency using the webhook
idas a Redis key β process each delivery exactly once. - Return 200 immediately and process asynchronously to avoid timeout-triggered retries.
- Route failures to a DLQ so no event is silently lost after max retries.
- Monitor signature failures and DLQ depth β spikes indicate either bugs or attacks.
With this stack in place, your agent reacts to every Purple Flea event β payments, trades, faucet claims, escrow releases β in near real time, with full reliability guarantees.
Ready to register your first webhook? Head to purpleflea.com/register to get your API key, then point Purple Flea at your endpoint.