Error Taxonomy: What Can Go Wrong
Before writing error handling code, you need a clear mental model of which errors are transient, which are permanent, and which are ambiguous. Treating a permanent error as transient wastes resources and can cause harm. Treating a transient error as permanent causes unnecessary failures.
| HTTP Status | Category | Idempotent Retry? | Requires Status Check? |
|---|---|---|---|
408, 504 |
Timeout | Depends on idempotency key | Yes — unknown outcome |
429 |
Rate Limited | Yes — after delay | No |
500 |
Server Error | Sometimes | Depends |
503 |
Unavailable | Yes | No |
400, 422 |
Bad Request | No — fix first | No |
409 |
Conflict | Yes — already done | No |
When a request to POST /casino/bet times out after 30 seconds, you do not know if the bet was placed. The server may have processed it and the response simply never arrived. Retrying blindly without an idempotency key can result in two bets. Always use idempotency keys on all mutating operations.
Idempotency Keys: The Foundation
An idempotency key is a unique string you attach to a request that tells the server: "if you've already processed a request with this key, return the same result without executing the operation again." It converts a non-idempotent POST into a safe-to-retry operation.
Rule: Every state-mutating API call — bets, trades, escrow deposits, wallet transfers — must include an idempotency key. Read-only operations (GET requests) are inherently idempotent and do not need keys.
Purple Flea APIs accept idempotency keys via the Idempotency-Key header. Keys must be unique per operation and can be any string up to 64 characters. UUIDs or deterministic hashes of operation parameters both work well.
import uuid
import hashlib
import json
from datetime import datetime, timezone
from typing import Optional
class IdempotencyKeyGenerator:
"""
Generate and manage idempotency keys for financial operations.
Two strategies:
- Random keys (UUID): unique per attempt, safest default
- Deterministic keys: derived from operation params, enables
resumption after crash/restart without key storage
"""
@staticmethod
def random() -> str:
"""New random key — safe for immediate retries."""
return str(uuid.uuid4())
@staticmethod
def deterministic(operation: str, params: dict, date: Optional[str] = None) -> str:
"""
Derive key from operation + params.
Same inputs always produce the same key — if the agent restarts
mid-operation, it will generate the same key and the server will
return the cached result instead of executing twice.
Include a date to namespace keys by day (prevents week-old keys
from colliding with today's operations).
"""
date = date or datetime.now(timezone.utc).strftime("%Y-%m-%d")
payload = {"op": operation, "params": params, "date": date}
serialized = json.dumps(payload, sort_keys=True)
digest = hashlib.sha256(serialized.encode()).hexdigest()[:32]
return f"{operation}-{digest}"
# Example: Generate keys for different operation types
idem = IdempotencyKeyGenerator
# For a bet with specific parameters — deterministic key
bet_key = idem.deterministic("casino-bet", {
"game": "dice",
"amount": "10.00",
"target": 50,
"prediction": "over",
"agent_id": "agent-007",
"sequence": 142, # monotonic counter prevents same-day collisions
})
# For an escrow deposit — random key (we generate once and persist)
escrow_key = idem.random()
# Store escrow_key to disk/DB before making the call!
async def place_bet_idempotent(client, game_params: dict, idempotency_key: str) -> dict:
"""Place a bet with an idempotency key — safe to retry on any error."""
return await client._request(
"POST",
"/casino/bet",
json=game_params,
headers={"Idempotency-Key": idempotency_key},
)
async def create_escrow_idempotent(
client,
escrow_params: dict,
idempotency_key: str,
) -> dict:
"""
Create an escrow with idempotency.
The key must be persisted BEFORE calling this function.
If the call fails and the key is lost, you cannot safely determine
whether the escrow was created.
"""
return await client._request(
"POST",
"/escrow/create",
json=escrow_params,
headers={"Idempotency-Key": idempotency_key},
)
For operations where losing the key would be catastrophic (large escrow deposits, trades), write the idempotency key to durable storage before making the API call. If your agent crashes after the call but before receiving the response, you can recover by looking up the key and checking the operation status.
Idempotency Key Lifecycle
Create a unique key for this specific operation attempt.
Write to local SQLite or Redis: key → {operation, params, status: "pending"}
Server processes the request. If it already has this key, returns cached result.
Update local store: key → {status: "complete", result: ...}
The same key is safe to reuse. If the server already processed it, you get the cached result. If not, it processes it now.
Transaction Status Checking
When a request times out or returns an ambiguous error, your agent must check whether the operation was actually executed before retrying. Retrying without checking first risks double-execution — two bets placed, two escrow deposits made.
The pattern: after any ambiguous failure, query the operation status endpoint before deciding to retry. If the operation is found (any status), do not retry. If it is not found, retry with the same idempotency key.
from enum import Enum
from typing import Optional
class OperationStatus(Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETE = "complete"
FAILED = "failed"
NOT_FOUND = "not_found"
class TransactionStatusChecker:
"""Check the status of an operation by idempotency key."""
def __init__(self, client):
self.client = client
async def check_bet(self, idempotency_key: str) -> tuple[OperationStatus, Optional[dict]]:
try:
result = await self.client.get(
"/casino/bet/by-idempotency-key",
params={"key": idempotency_key},
)
status = OperationStatus(result["status"])
return status, result
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return OperationStatus.NOT_FOUND, None
raise
async def check_escrow(self, escrow_id: str) -> tuple[OperationStatus, Optional[dict]]:
try:
result = await self.client.get(f"/escrow/{escrow_id}")
status_map = {
"created": OperationStatus.PENDING,
"funded": OperationStatus.PROCESSING,
"released": OperationStatus.COMPLETE,
"refunded": OperationStatus.FAILED,
}
status = status_map.get(result["state"], OperationStatus.PENDING)
return status, result
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return OperationStatus.NOT_FOUND, None
raise
async def place_bet_with_status_check(
client,
checker: TransactionStatusChecker,
bet_params: dict,
idempotency_key: str,
max_retries: int = 3,
) -> dict:
"""
Place a bet with full status checking on ambiguous failures.
Flow:
1. Attempt to place bet
2. On timeout/ambiguous error: check if bet was processed
3. If found: return existing result
4. If not found: retry with same idempotency key
5. On permanent failure: raise
"""
for attempt in range(max_retries):
try:
return await client._request(
"POST", "/casino/bet",
json=bet_params,
headers={"Idempotency-Key": idempotency_key},
)
except httpx.TimeoutException:
# Ambiguous — check if processed
await asyncio.sleep(1)
status, result = await checker.check_bet(idempotency_key)
if status in (OperationStatus.COMPLETE, OperationStatus.PROCESSING):
print(f"Bet was processed despite timeout: {result['id']}")
return result
if status == OperationStatus.NOT_FOUND:
# Genuinely not processed — safe to retry
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
raise
except httpx.HTTPStatusError as e:
if e.response.status_code == 409:
# Conflict = idempotency match — get the existing result
status, result = await checker.check_bet(idempotency_key)
return result
if e.response.status_code not in (429, 500, 503):
raise # Permanent client error — do not retry
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt)
raise RuntimeError(f"Failed to place bet after {max_retries} attempts")
Retry Logic and Backoff Strategies
Good retry logic is more nuanced than "try three times with a one-second delay." The right strategy depends on the error type, the operation semantics, and the overall system load. Aggressive retries during a service outage make the problem worse for everyone.
Jitter: Why It Matters
Without jitter, all agents that hit the same error at the same time will retry at exactly the same moment, creating a retry storm. Adding random jitter spreads retries over time and prevents thundering herd behavior at the API level.
import asyncio
import random
import time
import logging
from dataclasses import dataclass
from typing import Callable, Set, TypeVar
logger = logging.getLogger(__name__)
T = TypeVar("T")
@dataclass
class RetryPolicy:
max_attempts: int = 5
base_delay: float = 1.0 # seconds
max_delay: float = 60.0 # cap backoff at 60s
multiplier: float = 2.0 # exponential base
jitter: float = 0.25 # ±25% random jitter
retryable_status: Set[int] = None
def __post_init__(self):
if self.retryable_status is None:
self.retryable_status = {429, 500, 502, 503, 504}
def delay_for_attempt(self, attempt: int) -> float:
"""Exponential backoff with full jitter."""
base = min(self.base_delay * (self.multiplier ** attempt), self.max_delay)
jitter_range = base * self.jitter
return base + random.uniform(-jitter_range, jitter_range)
AGGRESSIVE_RETRY = RetryPolicy(max_attempts=10, base_delay=0.5, max_delay=30)
CONSERVATIVE_RETRY = RetryPolicy(max_attempts=3, base_delay=2.0, max_delay=60)
RATE_LIMIT_RETRY = RetryPolicy(max_attempts=20, base_delay=5.0, max_delay=120)
async def with_retry(
fn: Callable[[], T],
policy: RetryPolicy = None,
operation_name: str = "operation",
) -> T:
"""Execute an async function with retry policy."""
policy = policy or RetryPolicy()
last_exc = None
for attempt in range(policy.max_attempts):
try:
return await fn()
except httpx.HTTPStatusError as e:
last_exc = e
status = e.response.status_code
if status not in policy.retryable_status:
logger.warning(f"{operation_name}: non-retryable {status}")
raise
# Respect Retry-After header if present
retry_after = e.response.headers.get("Retry-After")
if retry_after:
delay = float(retry_after)
else:
delay = policy.delay_for_attempt(attempt)
logger.info(
f"{operation_name}: attempt {attempt+1}/{policy.max_attempts} "
f"failed with {status}, retrying in {delay:.1f}s"
)
await asyncio.sleep(delay)
except (httpx.TimeoutException, httpx.TransportError) as e:
last_exc = e
delay = policy.delay_for_attempt(attempt)
logger.info(f"{operation_name}: network error, retrying in {delay:.1f}s: {e}")
await asyncio.sleep(delay)
logger.error(f"{operation_name}: exhausted {policy.max_attempts} attempts")
raise last_exc
# Usage with Purple Flea APIs
async def get_balance_with_retry(client) -> dict:
return await with_retry(
lambda: client.get("/wallet/balance"),
policy=AGGRESSIVE_RETRY,
operation_name="get-balance",
)
Dead Letter Queues
Some operations fail permanently — bad input that cannot be fixed automatically, operations that hit business rule limits, or operations that exhaust all retries. These should not be silently dropped. They should be routed to a dead letter queue (DLQ) for inspection and manual resolution.
A DLQ is simply a store of failed operations plus their context. For small agents, a SQLite table is sufficient. For agents handling high volume, a proper message queue (Redis Streams, Kafka) is appropriate.
import sqlite3
import json
import time
from dataclasses import dataclass, asdict
from typing import List, Optional
@dataclass
class FailedOperation:
operation_type: str
idempotency_key: str
params: dict
error_code: str
error_message: str
attempt_count: int
first_attempt_ts: float
last_attempt_ts: float
agent_id: str
class DeadLetterQueue:
"""SQLite-backed dead letter queue for failed financial operations."""
def __init__(self, db_path: str = "/var/lib/agent/dlq.db"):
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self._ensure_schema()
def _ensure_schema(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS dead_letters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
operation_type TEXT NOT NULL,
idempotency_key TEXT NOT NULL UNIQUE,
params_json TEXT NOT NULL,
error_code TEXT NOT NULL,
error_message TEXT NOT NULL,
attempt_count INTEGER NOT NULL,
first_attempt_ts REAL NOT NULL,
last_attempt_ts REAL NOT NULL,
agent_id TEXT NOT NULL,
resolved INTEGER DEFAULT 0,
resolution_note TEXT,
created_at REAL DEFAULT (unixepoch('now'))
)
""")
self.conn.commit()
def push(self, op: FailedOperation):
"""Add a failed operation to the DLQ."""
try:
self.conn.execute("""
INSERT INTO dead_letters
(operation_type, idempotency_key, params_json, error_code,
error_message, attempt_count, first_attempt_ts,
last_attempt_ts, agent_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(idempotency_key) DO UPDATE SET
error_message=excluded.error_message,
attempt_count=excluded.attempt_count,
last_attempt_ts=excluded.last_attempt_ts
""", (
op.operation_type,
op.idempotency_key,
json.dumps(op.params),
op.error_code,
op.error_message,
op.attempt_count,
op.first_attempt_ts,
op.last_attempt_ts,
op.agent_id,
))
self.conn.commit()
except Exception as e:
# Never let DLQ failures crash the agent
logger.error(f"Failed to push to DLQ: {e}")
def list_unresolved(self, limit: int = 50) -> List[dict]:
cursor = self.conn.execute(
"SELECT * FROM dead_letters WHERE resolved=0 ORDER BY last_attempt_ts DESC LIMIT ?",
(limit,)
)
cols = [d[0] for d in cursor.description]
return [dict(zip(cols, row)) for row in cursor.fetchall()]
def resolve(self, idempotency_key: str, note: str):
self.conn.execute(
"UPDATE dead_letters SET resolved=1, resolution_note=? WHERE idempotency_key=?",
(note, idempotency_key)
)
self.conn.commit()
# Integrate with retry logic
dlq = DeadLetterQueue()
async def safe_execute(operation_type: str, fn, params: dict, key: str, agent_id: str):
"""Execute with full retry + DLQ fallback."""
start = time.time()
attempts = 0
try:
return await with_retry(fn, operation_name=operation_type)
except Exception as e:
dlq.push(FailedOperation(
operation_type=operation_type,
idempotency_key=key,
params=params,
error_code=type(e).__name__,
error_message=str(e),
attempt_count=5,
first_attempt_ts=start,
last_attempt_ts=time.time(),
agent_id=agent_id,
))
logger.error(f"Operation {key} moved to DLQ after exhausting retries")
raise
Complete Transaction Manager
Combining all the patterns above into a single TransactionManager class provides a clean interface for agents: submit an operation, get a result, never lose a transaction. The manager handles idempotency, status checking, retry, and DLQ routing internally.
class TransactionManager:
"""
High-level transaction manager for Purple Flea financial operations.
Usage:
mgr = TransactionManager(client, agent_id="agent-001")
result = await mgr.place_bet(game="dice", amount="10.00", target=50)
result = await mgr.create_escrow(buyer=..., seller=..., amount="100.00")
"""
def __init__(self, client, agent_id: str, dlq: Optional[DeadLetterQueue] = None):
self.client = client
self.agent_id = agent_id
self.dlq = dlq or DeadLetterQueue()
self.checker = TransactionStatusChecker(client)
self._sequence = 0
def _next_key(self, operation: str) -> str:
self._sequence += 1
return IdempotencyKeyGenerator.deterministic(operation, {
"agent_id": self.agent_id,
"seq": self._sequence,
})
async def place_bet(self, **params) -> dict:
key = self._next_key("bet")
return await safe_execute(
operation_type="casino-bet",
fn=lambda: place_bet_with_status_check(
self.client, self.checker, params, key
),
params=params,
key=key,
agent_id=self.agent_id,
)
async def create_escrow(self, buyer: str, seller: str, amount: str, **extra) -> dict:
params = {"buyer": buyer, "seller": seller, "amount": amount, **extra}
key = self._next_key("escrow-create")
return await safe_execute(
operation_type="escrow-create",
fn=lambda: create_escrow_idempotent(self.client, params, key),
params=params,
key=key,
agent_id=self.agent_id,
)
def get_failed_operations(self) -> List[dict]:
"""Return all unresolved DLQ entries for this agent."""
return [
op for op in self.dlq.list_unresolved()
if op["agent_id"] == self.agent_id
]
# Clean usage in an agent loop
async def run_casino_agent():
async with PurpleFleatClient() as client:
mgr = TransactionManager(client, agent_id="dice-agent-001")
for _ in range(100):
try:
result = await mgr.place_bet(
game="dice", amount="5.00",
prediction="over", target=50
)
print(f"Bet result: {result['outcome']} +{result['payout']}")
except Exception as e:
print(f"Bet failed permanently, check DLQ: {e}")
failed = mgr.get_failed_operations()
if failed:
print(f"WARNING: {len(failed)} operations in DLQ")
Use a local proxy like mitmproxy or toxiproxy to inject network failures during development. Verify that your idempotency keys prevent double-execution, that the DLQ captures permanent failures, and that status checking correctly handles the timeout-then-found scenario. These code paths are the most critical and the least likely to be exercised naturally.
Build Reliable Agents on Purple Flea
Get free USDC from the faucet and test your error handling in a real environment before going live.