1. Failure Taxonomy for Trading Agents
Before designing recovery mechanisms, classify the failures your agent will encounter. Different failure classes require different recovery strategies — a network timeout should be retried, but a logic error that causes over-trading should not.
| Class | Examples | Retryable | Recovery Strategy |
|---|---|---|---|
| Network | DNS failure, TCP timeout, TLS error | Yes | Exponential backoff, circuit breaker |
| API / Exchange | 429 rate limit, 503 maintenance, 500 server error | Conditionally | Backoff with jitter; 5xx retry, 4xx do not |
| Data | Bad price feed, NaN in OHLCV, stale timestamp | No | Fallback data source, skip signal |
| Logic | Negative position size, division by zero, infinite loop | No | Dead letter queue, halt + alert |
| State | DB corrupted, cache miss, stale state after restart | Sometimes | Checkpoint recovery, re-sync |
| Resource | OOM, disk full, CPU thrash | No | Graceful degradation, alert + restart |
2. Circuit Breaker Pattern
The circuit breaker prevents an agent from hammering a failing dependency. Like an electrical circuit breaker, it "trips" after enough failures and stays open for a cooling period before allowing traffic through again.
Three states:
- Closed — normal operation; failures are counted
- Open — dependency is presumed down; all calls fail fast (no network call)
- Half-Open — cooldown expired; one probe request allowed to test recovery
import asyncio, time, logging
from enum import Enum
from typing import Callable, Any, TypeVar
from functools import wraps
T = TypeVar("T")
class CBState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(
self,
name: str,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
expected_exception: type = Exception
):
self.name = name
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self._state = CBState.CLOSED
self._failures = 0
self._last_failure_time: float = 0
self.log = logging.getLogger(f"circuit_breaker.{name}")
@property
def state(self) -> CBState:
if self._state == CBState.OPEN:
if time.monotonic() - self._last_failure_time >= self.recovery_timeout:
self._state = CBState.HALF_OPEN
self.log.info(f"[{self.name}] OPEN -> HALF_OPEN (probing)")
return self._state
def _on_success(self):
if self._state == CBState.HALF_OPEN:
self.log.info(f"[{self.name}] HALF_OPEN -> CLOSED (recovered)")
self._failures = 0
self._state = CBState.CLOSED
def _on_failure(self, exc: Exception):
self._failures += 1
self._last_failure_time = time.monotonic()
if self._failures >= self.failure_threshold:
if self._state != CBState.OPEN:
self.log.warning(
f"[{self.name}] CLOSED -> OPEN after {self._failures} failures. "
f"Last error: {exc}"
)
self._state = CBState.OPEN
async def call(self, func: Callable[..., T], *args, **kwargs) -> T:
if self.state == CBState.OPEN:
raise RuntimeError(
f"Circuit breaker [{self.name}] is OPEN — "
f"dependency presumed down"
)
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure(e)
raise
# Usage:
# cb = CircuitBreaker("purpleflea-api", failure_threshold=5, recovery_timeout=60)
# try:
# result = await cb.call(place_order, asset="BTC", size=0.01)
# except RuntimeError as e:
# # Circuit open — use fallback or skip this cycle
# print(f"Skipping trade: {e}")
3. Exponential Backoff with Jitter
When retrying a failed request, naive strategies fail under load. Constant delay causes synchronized retry storms. Exponential delay without jitter causes correlated spikes. The correct approach is full jitter exponential backoff: random delay in [0, base * 2^attempt].
import asyncio, random, logging, time
from typing import Callable, TypeVar, Optional, Type, Tuple
T = TypeVar("T")
class RetryConfig:
def __init__(
self,
max_attempts: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True,
retryable_exceptions: Tuple[Type[Exception], ...] = (Exception,),
non_retryable_http_codes: Tuple[int, ...] = (400, 401, 403, 404, 422),
):
self.max_attempts = max_attempts
self.base_delay = base_delay
self.max_delay = max_delay
self.jitter = jitter
self.retryable_exceptions = retryable_exceptions
self.non_retryable_http_codes = non_retryable_http_codes
def delay(self, attempt: int) -> float:
cap = min(self.max_delay, self.base_delay * (2 ** attempt))
if self.jitter:
return random.uniform(0, cap) # full jitter
return cap
async def retry_async(
func: Callable[..., T],
config: RetryConfig,
*args,
**kwargs
) -> T:
log = logging.getLogger("retry")
last_exc: Optional[Exception] = None
for attempt in range(config.max_attempts):
try:
return await func(*args, **kwargs)
except config.retryable_exceptions as exc:
last_exc = exc
# Check for non-retryable HTTP status codes
status = getattr(exc, "status", None) or getattr(exc, "status_code", None)
if status and status in config.non_retryable_http_codes:
log.warning(f"Non-retryable status {status}: {exc}")
raise
delay = config.delay(attempt)
log.warning(
f"Attempt {attempt+1}/{config.max_attempts} failed: {exc}. "
f"Retrying in {delay:.2f}s"
)
if attempt < config.max_attempts - 1:
await asyncio.sleep(delay)
raise last_exc
# Decorator version
def with_retry(config: RetryConfig = None):
cfg = config or RetryConfig()
def decorator(func):
async def wrapper(*args, **kwargs):
return await retry_async(func, cfg, *args, **kwargs)
return wrapper
return decorator
# Usage:
# @with_retry(RetryConfig(max_attempts=4, base_delay=2.0))
# async def place_order(session, payload):
# async with session.post("/api/trade", json=payload) as r:
# r.raise_for_status()
# return await r.json()
4. Dead Letter Queue for Failed Orders
A dead letter queue (DLQ) captures operations that have exhausted all retry attempts. Instead of silently dropping failed orders, the DLQ stores them for inspection, manual replay, or automated analysis of what went wrong.
Key properties of a good DLQ:
- Persistence — survives agent restarts (use a file, SQLite, or Redis)
- Metadata — captures the original payload, all error messages, timestamps, and attempt count
- Replay capability — failed operations can be retried after a human fix
- Alert on growth — alert when DLQ size exceeds threshold
import asyncio, json, time, sqlite3, logging
from dataclasses import dataclass, field, asdict
from typing import Any, List, Optional
@dataclass
class DeadLetter:
id: str
operation_type: str # e.g. "place_order", "claim_faucet"
payload: dict
errors: list # list of (timestamp, error_message)
attempts: int
first_attempted_at: float
last_attempted_at: float
resolved: bool = False
resolution_note: str = ""
class SQLiteDeadLetterQueue:
"""Persistent DLQ backed by SQLite."""
def __init__(self, db_path: str = "agent_dlq.db"):
self.db_path = db_path
self.log = logging.getLogger("dlq")
self._init_db()
def _init_db(self):
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS dead_letters (
id TEXT PRIMARY KEY,
operation_type TEXT,
payload TEXT,
errors TEXT,
attempts INTEGER,
first_attempted_at REAL,
last_attempted_at REAL,
resolved INTEGER DEFAULT 0,
resolution_note TEXT DEFAULT ''
)
""")
conn.commit()
conn.close()
def enqueue(self, dl: DeadLetter):
conn = sqlite3.connect(self.db_path)
conn.execute(
"""INSERT OR REPLACE INTO dead_letters
(id, operation_type, payload, errors, attempts,
first_attempted_at, last_attempted_at, resolved, resolution_note)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
dl.id,
dl.operation_type,
json.dumps(dl.payload),
json.dumps(dl.errors),
dl.attempts,
dl.first_attempted_at,
dl.last_attempted_at,
int(dl.resolved),
dl.resolution_note
)
)
conn.commit()
conn.close()
self.log.warning(
f"DLQ: enqueued {dl.operation_type} (id={dl.id}, "
f"attempts={dl.attempts})"
)
def get_pending(self, limit: int = 50) -> List[DeadLetter]:
conn = sqlite3.connect(self.db_path)
rows = conn.execute(
"SELECT * FROM dead_letters WHERE resolved=0 ORDER BY last_attempted_at DESC LIMIT ?",
(limit,)
).fetchall()
conn.close()
return [self._row_to_dl(r) for r in rows]
def mark_resolved(self, dl_id: str, note: str = ""):
conn = sqlite3.connect(self.db_path)
conn.execute(
"UPDATE dead_letters SET resolved=1, resolution_note=? WHERE id=?",
(note, dl_id)
)
conn.commit()
conn.close()
self.log.info(f"DLQ: resolved {dl_id}: {note}")
def size(self) -> int:
conn = sqlite3.connect(self.db_path)
n = conn.execute("SELECT COUNT(*) FROM dead_letters WHERE resolved=0").fetchone()[0]
conn.close()
return n
def _row_to_dl(self, row) -> DeadLetter:
return DeadLetter(
id=row[0], operation_type=row[1],
payload=json.loads(row[2]), errors=json.loads(row[3]),
attempts=row[4], first_attempted_at=row[5],
last_attempted_at=row[6], resolved=bool(row[7]),
resolution_note=row[8]
)
5. State Checkpointing and Recovery
When an agent restarts after a crash, it must recover its state. Without checkpointing, it loses position tracking, PnL history, and open order state — potentially causing it to re-enter positions it already holds.
Checkpoint strategy: persist the agent's full state to disk (or database) at regular intervals and on every state-changing action (order placed, order filled, position closed).
import json, os, time, logging, asyncio
from dataclasses import dataclass, asdict, field
from typing import Dict, List, Optional
import hashlib
@dataclass
class AgentState:
agent_id: str
balance: float
open_positions: Dict[str, dict] # asset -> {size, entry_price, ts}
pending_orders: Dict[str, dict] # order_id -> {asset, side, size, price}
pnl_history: List[float]
last_checkpoint_ts: float = field(default_factory=time.time)
checkpoint_seq: int = 0
class Checkpointer:
"""Atomic checkpoint write with checksum verification."""
def __init__(self, checkpoint_dir: str = "/var/lib/agent/checkpoints"):
self.checkpoint_dir = checkpoint_dir
os.makedirs(checkpoint_dir, exist_ok=True)
self.log = logging.getLogger("checkpoint")
def _path(self, agent_id: str) -> str:
return os.path.join(self.checkpoint_dir, f"{agent_id}.json")
def _tmp_path(self, agent_id: str) -> str:
return os.path.join(self.checkpoint_dir, f"{agent_id}.json.tmp")
def save(self, state: AgentState):
"""Atomic write: write to .tmp then rename (atomic on POSIX)."""
state.last_checkpoint_ts = time.time()
state.checkpoint_seq += 1
data = asdict(state)
payload = json.dumps(data, indent=2)
checksum = hashlib.sha256(payload.encode()).hexdigest()
wrapper = {"checksum": checksum, "state": data}
tmp = self._tmp_path(state.agent_id)
with open(tmp, "w") as f:
json.dump(wrapper, f)
os.replace(tmp, self._path(state.agent_id)) # atomic rename
self.log.debug(f"Checkpoint saved (seq={state.checkpoint_seq})")
def load(self, agent_id: str) -> Optional[AgentState]:
path = self._path(agent_id)
if not os.path.exists(path):
return None
with open(path) as f:
wrapper = json.load(f)
# Verify checksum
data = wrapper["state"]
expected = wrapper.get("checksum")
payload = json.dumps(data, indent=2)
actual = hashlib.sha256(payload.encode()).hexdigest()
if expected != actual:
self.log.error(f"Checkpoint checksum mismatch for {agent_id} — ignoring")
return None
self.log.info(
f"Loaded checkpoint (agent={agent_id}, "
f"seq={data['checkpoint_seq']}, "
f"age={time.time()-data['last_checkpoint_ts']:.0f}s)"
)
return AgentState(**data)
class AutoCheckpointer:
"""Wraps an agent and saves checkpoint every N seconds."""
def __init__(self, agent, checkpointer: Checkpointer, interval: int = 30):
self.agent = agent
self.cp = checkpointer
self.interval = interval
async def run_checkpoint_loop(self):
while True:
await asyncio.sleep(self.interval)
try:
state = self.agent.get_state()
self.cp.save(state)
except Exception as e:
logging.getLogger("checkpoint").error(f"Checkpoint failed: {e}")
6. Health-Based Degradation Modes
Rather than a binary running/stopped state, a resilient agent operates in degraded modes when dependencies partially fail. Define explicit modes with clear capability restrictions:
| Mode | Condition | Capabilities | Action |
|---|---|---|---|
| Full | All systems nominal | Trade, read, write | Normal operation |
| Read-Only | Write API degraded | Read positions, no new orders | Close open positions only |
| Observation | Price feed stale | Log + monitor only | No trades; wait for feed recovery |
| Shutdown | Balance below min OR drawdown exceeded | Nothing | Close all, alert, stop |
from enum import Enum
import time, logging, asyncio
class AgentMode(Enum):
FULL = "full"
READ_ONLY = "read_only"
OBSERVATION = "observation"
SHUTDOWN = "shutdown"
class DegradationManager:
def __init__(self, agent_config):
self.cfg = agent_config
self._mode = AgentMode.FULL
self._mode_since = time.time()
self.log = logging.getLogger("degradation")
@property
def mode(self) -> AgentMode:
return self._mode
def transition(self, new_mode: AgentMode, reason: str):
if new_mode == self._mode:
return
old = self._mode
self._mode = new_mode
self._mode_since = time.time()
self.log.warning(
f"Mode transition: {old.value} -> {new_mode.value} | reason: {reason}"
)
def can_trade(self) -> bool:
return self._mode == AgentMode.FULL
def can_read(self) -> bool:
return self._mode in (AgentMode.FULL, AgentMode.READ_ONLY, AgentMode.OBSERVATION)
def is_active(self) -> bool:
return self._mode != AgentMode.SHUTDOWN
async def assess(self, health: dict):
"""Called periodically with current health snapshot to determine mode."""
if health.get("balance", 0) < self.cfg.min_balance:
self.transition(AgentMode.SHUTDOWN, f"balance below minimum")
return
drawdown = health.get("drawdown_pct", 0)
if drawdown > self.cfg.max_drawdown_pct * 100:
self.transition(AgentMode.SHUTDOWN, f"drawdown {drawdown:.1f}% exceeded limit")
return
price_feed_age = health.get("price_feed_age_secs", 0)
if price_feed_age > 60:
self.transition(AgentMode.OBSERVATION,
f"price feed stale ({price_feed_age:.0f}s)")
return
api_ok = health.get("exchange_api_ok", True)
if not api_ok:
self.transition(AgentMode.READ_ONLY, "exchange write API degraded")
return
self.transition(AgentMode.FULL, "all systems nominal")
7. Chaos Engineering Tests
Chaos engineering verifies that your resilience patterns actually work by intentionally injecting failures in a controlled way. The discipline originated at Netflix (Chaos Monkey) and is now standard practice for critical systems.
Recommended chaos tests for trading agents:
- Network partition — block the exchange API for 30s and verify circuit breaker trips
- Slow response — add 5s latency to API responses and verify timeout handling
- Bad price data — inject NaN into price feed and verify the agent skips the signal
- OOM kill — kill the agent process and verify it recovers from checkpoint correctly
- DB corruption — delete the checkpoint file and verify the agent starts clean
- Duplicate order — fire the same order twice and verify idempotency
import asyncio, aiohttp, random, logging, time
from unittest.mock import AsyncMock, patch
from typing import Callable, Any
class ChaosProxy:
"""
HTTP proxy that injects failures for chaos engineering.
Wrap your aiohttp.ClientSession with this in test mode.
"""
def __init__(self, session: aiohttp.ClientSession,
failure_rate: float = 0.2,
latency_ms: float = 0,
error_codes: list = None):
self.session = session
self.failure_rate = failure_rate
self.latency_ms = latency_ms
self.error_codes = error_codes or [500, 503]
self.log = logging.getLogger("chaos")
self._injected = 0
self._total = 0
async def post(self, url: str, **kwargs) -> aiohttp.ClientResponse:
return await self._inject(self.session.post, url, **kwargs)
async def get(self, url: str, **kwargs) -> aiohttp.ClientResponse:
return await self._inject(self.session.get, url, **kwargs)
async def _inject(self, method, url: str, **kwargs):
self._total += 1
if self.latency_ms > 0:
await asyncio.sleep(self.latency_ms / 1000)
if random.random() < self.failure_rate:
self._injected += 1
code = random.choice(self.error_codes)
self.log.warning(f"CHAOS: injecting {code} for {url}")
raise aiohttp.ClientResponseError(
request_info=None, history=None, status=code,
message=f"Chaos-injected {code}"
)
return await method(url, **kwargs)
@property
def injection_rate(self) -> float:
return self._injected / self._total if self._total > 0 else 0.0
async def run_chaos_test(agent_class, config, duration_secs: int = 60):
"""
Run an agent under chaos conditions and verify it survives.
"""
log = logging.getLogger("chaos_test")
log.info(f"Starting chaos test for {duration_secs}s")
async with aiohttp.ClientSession() as real_session:
chaos_session = ChaosProxy(
real_session,
failure_rate=0.30, # 30% of calls will fail
latency_ms=500, # 500ms extra latency on all calls
)
agent = agent_class(config, session=chaos_session)
start = time.time()
try:
await asyncio.wait_for(agent.run(), timeout=duration_secs)
except asyncio.TimeoutError:
pass # expected — test duration
report = {
"duration_secs": time.time() - start,
"chaos_injections": chaos_session._injected,
"total_calls": chaos_session._total,
"actual_injection_rate": chaos_session.injection_rate,
"agent_final_mode": agent.mode.value if hasattr(agent, "mode") else "unknown",
}
log.info(f"Chaos test complete: {report}")
return report
8. Combining Patterns: The Resilient Order Executor
In practice, you compose multiple resilience patterns together. Here is a production-grade order executor that combines the circuit breaker, exponential backoff, idempotency check, and DLQ — all in one class.
import asyncio, aiohttp, hashlib, json, time, logging
from typing import Optional
class ResilientOrderExecutor:
"""
Places orders with: circuit breaker + retry + idempotency + DLQ.
This is the pattern recommended for Purple Flea API integrations.
"""
def __init__(self, api_key: str, api_base: str,
dlq: SQLiteDeadLetterQueue,
cb: CircuitBreaker):
self.api_key = api_key
self.api_base = api_base.rstrip("/")
self.dlq = dlq
self.cb = cb
self.retry_cfg = RetryConfig(
max_attempts=4,
base_delay=2.0,
max_delay=30.0,
jitter=True,
non_retryable_http_codes=(400, 401, 403, 404, 422)
)
self.log = logging.getLogger("order_executor")
self._placed_ids: set = set() # in-memory idempotency cache
def _idempotency_key(self, order: dict) -> str:
"""Deterministic key from order params — same order always same key."""
canonical = json.dumps(order, sort_keys=True)
return hashlib.sha256(canonical.encode()).hexdigest()[:16]
async def _place_once(self, session: aiohttp.ClientSession,
order: dict, idempotency_key: str) -> dict:
async with session.post(
f"{self.api_base}/api/trade",
json=order,
headers={
"Authorization": f"Bearer {self.api_key}",
"Idempotency-Key": idempotency_key
},
timeout=aiohttp.ClientTimeout(total=10)
) as r:
r.raise_for_status()
return await r.json()
async def place_order(self, order: dict) -> Optional[dict]:
ikey = self._idempotency_key(order)
# Idempotency guard
if ikey in self._placed_ids:
self.log.warning(f"Order already placed (ikey={ikey}) — skipping duplicate")
return None
errors = []
async def attempt_with_cb(session):
return await self.cb.call(self._place_once, session, order, ikey)
try:
async with aiohttp.ClientSession() as session:
result = await retry_async(
attempt_with_cb,
self.retry_cfg,
session
)
self._placed_ids.add(ikey)
self.log.info(f"Order placed: {result.get('order_id')} (ikey={ikey})")
return result
except Exception as e:
errors.append((time.time(), str(e)))
self.log.error(f"Order failed after all retries: {e}")
# Send to DLQ
dl = DeadLetter(
id=ikey,
operation_type="place_order",
payload=order,
errors=errors,
attempts=self.retry_cfg.max_attempts,
first_attempted_at=time.time() - sum(
self.retry_cfg.delay(i) for i in range(self.retry_cfg.max_attempts)
),
last_attempted_at=time.time()
)
self.dlq.enqueue(dl)
return None
9. ResilienceAgent: Full Async Implementation
The complete ResilienceAgent wraps all the patterns above into a single runnable agent. It initializes all subsystems, recovers from checkpoint on startup, and runs the main trading loop with full resilience guarantees.
import asyncio, os, logging, time
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
class ResilienceAgent:
"""
Full-stack resilient trading agent integrating all patterns.
Replace strategy_tick() with your signal logic.
"""
def __init__(self, config):
self.config = config
self.log = logging.getLogger(f"agent.{config.agent_id}")
# Resilience subsystems
self.cb = CircuitBreaker("purpleflea", failure_threshold=5, recovery_timeout=60)
self.dlq = SQLiteDeadLetterQueue(f"/var/lib/agent/{config.agent_id}_dlq.db")
self.cp = Checkpointer("/var/lib/agent/checkpoints")
self.degradation = DegradationManager(config)
self.executor = ResilientOrderExecutor(
api_key=config.api_key,
api_base=config.api_base,
dlq=self.dlq,
cb=self.cb
)
# State
self.state: AgentState = None
self._running = True
async def start(self):
"""Load checkpoint or initialize fresh state."""
self.state = self.cp.load(self.config.agent_id)
if self.state:
self.log.info(
f"Recovered from checkpoint (seq={self.state.checkpoint_seq}, "
f"balance={self.state.balance:.2f})"
)
else:
self.log.info("No checkpoint found — starting fresh")
self.state = AgentState(
agent_id=self.config.agent_id,
balance=self.config.initial_balance,
open_positions={},
pending_orders={},
pnl_history=[]
)
# Start checkpoint loop
asyncio.create_task(
AutoCheckpointer(self, self.cp, interval=30).run_checkpoint_loop()
)
def get_state(self) -> AgentState:
return self.state
async def strategy_tick(self) -> dict | None:
"""Override with your strategy. Return order dict or None."""
# Example: always return None (do nothing)
return None
async def run(self):
await self.start()
self.log.info(f"Agent {self.config.agent_id} running")
while self._running:
try:
health = {
"balance": self.state.balance,
"drawdown_pct": 0, # compute from pnl_history
"price_feed_age_secs": 0,
"exchange_api_ok": self.cb.state.value != "open"
}
await self.degradation.assess(health)
if not self.degradation.is_active():
self.log.warning("Agent in SHUTDOWN mode — stopping")
break
if self.degradation.can_trade():
order = await self.strategy_tick()
if order:
result = await self.executor.place_order(order)
if result:
self.log.info(f"Trade executed: {result}")
if self.dlq.size() > 10:
self.log.warning(f"DLQ has {self.dlq.size()} pending items!")
await asyncio.sleep(self.config.tick_interval)
except asyncio.CancelledError:
break
except Exception as e:
self.log.error(f"Main loop error: {e}", exc_info=True)
await asyncio.sleep(5)
self.cp.save(self.state)
self.log.info("Agent stopped cleanly. Final checkpoint saved.")
if __name__ == "__main__":
from types import SimpleNamespace
cfg = SimpleNamespace(
agent_id=os.environ.get("AGENT_ID", "resilience-demo"),
api_key=os.environ["PURPLEFLEA_API_KEY"],
api_base="https://purpleflea.com",
initial_balance=100.0,
min_balance=10.0,
max_drawdown_pct=0.20,
tick_interval=10,
)
asyncio.run(ResilienceAgent(cfg).run())
Build resilient agents on Purple Flea
Purple Flea provides financial infrastructure designed for autonomous agents — casino, escrow, faucet, wallet, perpetuals, and domains. Start for free with the agent faucet.
Explore Purple Flea