Guide

Error Recovery and Resilience Patterns for AI Trading Agents

How autonomous agents handle failures gracefully — circuit breakers, exponential backoff, dead letter queues, state recovery, and chaos engineering practices.

Purple Flea March 6, 2026 22 min read
Patterns Covered
8
From backoff to chaos
Recovery Time
Auto
No human intervention
Failure Modes
4
Network, API, data, logic

1. Failure Taxonomy for Trading Agents

Before designing recovery mechanisms, classify the failures your agent will encounter. Different failure classes require different recovery strategies — a network timeout should be retried, but a logic error that causes over-trading should not.

ClassExamplesRetryableRecovery Strategy
NetworkDNS failure, TCP timeout, TLS errorYesExponential backoff, circuit breaker
API / Exchange429 rate limit, 503 maintenance, 500 server errorConditionallyBackoff with jitter; 5xx retry, 4xx do not
DataBad price feed, NaN in OHLCV, stale timestampNoFallback data source, skip signal
LogicNegative position size, division by zero, infinite loopNoDead letter queue, halt + alert
StateDB corrupted, cache miss, stale state after restartSometimesCheckpoint recovery, re-sync
ResourceOOM, disk full, CPU thrashNoGraceful degradation, alert + restart
Golden rule: A trading agent must never retry idempotency-unsafe operations (like placing an order) without first checking if the previous attempt succeeded. Double-execution in financial systems means double positions.

2. Circuit Breaker Pattern

The circuit breaker prevents an agent from hammering a failing dependency. Like an electrical circuit breaker, it "trips" after enough failures and stays open for a cooling period before allowing traffic through again.

Three states:

circuit_breaker.pyPython
import asyncio, time, logging
from enum import Enum
from typing import Callable, Any, TypeVar
from functools import wraps

T = TypeVar("T")

class CBState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreaker:
    def __init__(
        self,
        name: str,
        failure_threshold: int = 5,
        recovery_timeout: float = 60.0,
        expected_exception: type = Exception
    ):
        self.name = name
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        self._state = CBState.CLOSED
        self._failures = 0
        self._last_failure_time: float = 0
        self.log = logging.getLogger(f"circuit_breaker.{name}")

    @property
    def state(self) -> CBState:
        if self._state == CBState.OPEN:
            if time.monotonic() - self._last_failure_time >= self.recovery_timeout:
                self._state = CBState.HALF_OPEN
                self.log.info(f"[{self.name}] OPEN -> HALF_OPEN (probing)")
        return self._state

    def _on_success(self):
        if self._state == CBState.HALF_OPEN:
            self.log.info(f"[{self.name}] HALF_OPEN -> CLOSED (recovered)")
        self._failures = 0
        self._state = CBState.CLOSED

    def _on_failure(self, exc: Exception):
        self._failures += 1
        self._last_failure_time = time.monotonic()
        if self._failures >= self.failure_threshold:
            if self._state != CBState.OPEN:
                self.log.warning(
                    f"[{self.name}] CLOSED -> OPEN after {self._failures} failures. "
                    f"Last error: {exc}"
                )
            self._state = CBState.OPEN

    async def call(self, func: Callable[..., T], *args, **kwargs) -> T:
        if self.state == CBState.OPEN:
            raise RuntimeError(
                f"Circuit breaker [{self.name}] is OPEN — "
                f"dependency presumed down"
            )
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure(e)
            raise

# Usage:
# cb = CircuitBreaker("purpleflea-api", failure_threshold=5, recovery_timeout=60)
# try:
#     result = await cb.call(place_order, asset="BTC", size=0.01)
# except RuntimeError as e:
#     # Circuit open — use fallback or skip this cycle
#     print(f"Skipping trade: {e}")

3. Exponential Backoff with Jitter

When retrying a failed request, naive strategies fail under load. Constant delay causes synchronized retry storms. Exponential delay without jitter causes correlated spikes. The correct approach is full jitter exponential backoff: random delay in [0, base * 2^attempt].

AWS engineering recommendation: "Full jitter" (random between 0 and the capped exponential delay) produces the lowest contention under load. "Equal jitter" (base + random half) is a reasonable alternative that guarantees some minimum delay.
retry.pyPython
import asyncio, random, logging, time
from typing import Callable, TypeVar, Optional, Type, Tuple

T = TypeVar("T")

class RetryConfig:
    def __init__(
        self,
        max_attempts: int = 5,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        jitter: bool = True,
        retryable_exceptions: Tuple[Type[Exception], ...] = (Exception,),
        non_retryable_http_codes: Tuple[int, ...] = (400, 401, 403, 404, 422),
    ):
        self.max_attempts = max_attempts
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.jitter = jitter
        self.retryable_exceptions = retryable_exceptions
        self.non_retryable_http_codes = non_retryable_http_codes

    def delay(self, attempt: int) -> float:
        cap = min(self.max_delay, self.base_delay * (2 ** attempt))
        if self.jitter:
            return random.uniform(0, cap)  # full jitter
        return cap

async def retry_async(
    func: Callable[..., T],
    config: RetryConfig,
    *args,
    **kwargs
) -> T:
    log = logging.getLogger("retry")
    last_exc: Optional[Exception] = None

    for attempt in range(config.max_attempts):
        try:
            return await func(*args, **kwargs)

        except config.retryable_exceptions as exc:
            last_exc = exc

            # Check for non-retryable HTTP status codes
            status = getattr(exc, "status", None) or getattr(exc, "status_code", None)
            if status and status in config.non_retryable_http_codes:
                log.warning(f"Non-retryable status {status}: {exc}")
                raise

            delay = config.delay(attempt)
            log.warning(
                f"Attempt {attempt+1}/{config.max_attempts} failed: {exc}. "
                f"Retrying in {delay:.2f}s"
            )
            if attempt < config.max_attempts - 1:
                await asyncio.sleep(delay)

    raise last_exc

# Decorator version
def with_retry(config: RetryConfig = None):
    cfg = config or RetryConfig()
    def decorator(func):
        async def wrapper(*args, **kwargs):
            return await retry_async(func, cfg, *args, **kwargs)
        return wrapper
    return decorator

# Usage:
# @with_retry(RetryConfig(max_attempts=4, base_delay=2.0))
# async def place_order(session, payload):
#     async with session.post("/api/trade", json=payload) as r:
#         r.raise_for_status()
#         return await r.json()

4. Dead Letter Queue for Failed Orders

A dead letter queue (DLQ) captures operations that have exhausted all retry attempts. Instead of silently dropping failed orders, the DLQ stores them for inspection, manual replay, or automated analysis of what went wrong.

Key properties of a good DLQ:

dead_letter_queue.pyPython
import asyncio, json, time, sqlite3, logging
from dataclasses import dataclass, field, asdict
from typing import Any, List, Optional

@dataclass
class DeadLetter:
    id: str
    operation_type: str       # e.g. "place_order", "claim_faucet"
    payload: dict
    errors: list              # list of (timestamp, error_message)
    attempts: int
    first_attempted_at: float
    last_attempted_at: float
    resolved: bool = False
    resolution_note: str = ""

class SQLiteDeadLetterQueue:
    """Persistent DLQ backed by SQLite."""

    def __init__(self, db_path: str = "agent_dlq.db"):
        self.db_path = db_path
        self.log = logging.getLogger("dlq")
        self._init_db()

    def _init_db(self):
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS dead_letters (
                id TEXT PRIMARY KEY,
                operation_type TEXT,
                payload TEXT,
                errors TEXT,
                attempts INTEGER,
                first_attempted_at REAL,
                last_attempted_at REAL,
                resolved INTEGER DEFAULT 0,
                resolution_note TEXT DEFAULT ''
            )
        """)
        conn.commit()
        conn.close()

    def enqueue(self, dl: DeadLetter):
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            """INSERT OR REPLACE INTO dead_letters
               (id, operation_type, payload, errors, attempts,
                first_attempted_at, last_attempted_at, resolved, resolution_note)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
            (
                dl.id,
                dl.operation_type,
                json.dumps(dl.payload),
                json.dumps(dl.errors),
                dl.attempts,
                dl.first_attempted_at,
                dl.last_attempted_at,
                int(dl.resolved),
                dl.resolution_note
            )
        )
        conn.commit()
        conn.close()
        self.log.warning(
            f"DLQ: enqueued {dl.operation_type} (id={dl.id}, "
            f"attempts={dl.attempts})"
        )

    def get_pending(self, limit: int = 50) -> List[DeadLetter]:
        conn = sqlite3.connect(self.db_path)
        rows = conn.execute(
            "SELECT * FROM dead_letters WHERE resolved=0 ORDER BY last_attempted_at DESC LIMIT ?",
            (limit,)
        ).fetchall()
        conn.close()
        return [self._row_to_dl(r) for r in rows]

    def mark_resolved(self, dl_id: str, note: str = ""):
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            "UPDATE dead_letters SET resolved=1, resolution_note=? WHERE id=?",
            (note, dl_id)
        )
        conn.commit()
        conn.close()
        self.log.info(f"DLQ: resolved {dl_id}: {note}")

    def size(self) -> int:
        conn = sqlite3.connect(self.db_path)
        n = conn.execute("SELECT COUNT(*) FROM dead_letters WHERE resolved=0").fetchone()[0]
        conn.close()
        return n

    def _row_to_dl(self, row) -> DeadLetter:
        return DeadLetter(
            id=row[0], operation_type=row[1],
            payload=json.loads(row[2]), errors=json.loads(row[3]),
            attempts=row[4], first_attempted_at=row[5],
            last_attempted_at=row[6], resolved=bool(row[7]),
            resolution_note=row[8]
        )

5. State Checkpointing and Recovery

When an agent restarts after a crash, it must recover its state. Without checkpointing, it loses position tracking, PnL history, and open order state — potentially causing it to re-enter positions it already holds.

Checkpoint strategy: persist the agent's full state to disk (or database) at regular intervals and on every state-changing action (order placed, order filled, position closed).

checkpoint.pyPython
import json, os, time, logging, asyncio
from dataclasses import dataclass, asdict, field
from typing import Dict, List, Optional
import hashlib

@dataclass
class AgentState:
    agent_id: str
    balance: float
    open_positions: Dict[str, dict]   # asset -> {size, entry_price, ts}
    pending_orders: Dict[str, dict]   # order_id -> {asset, side, size, price}
    pnl_history: List[float]
    last_checkpoint_ts: float = field(default_factory=time.time)
    checkpoint_seq: int = 0

class Checkpointer:
    """Atomic checkpoint write with checksum verification."""

    def __init__(self, checkpoint_dir: str = "/var/lib/agent/checkpoints"):
        self.checkpoint_dir = checkpoint_dir
        os.makedirs(checkpoint_dir, exist_ok=True)
        self.log = logging.getLogger("checkpoint")

    def _path(self, agent_id: str) -> str:
        return os.path.join(self.checkpoint_dir, f"{agent_id}.json")

    def _tmp_path(self, agent_id: str) -> str:
        return os.path.join(self.checkpoint_dir, f"{agent_id}.json.tmp")

    def save(self, state: AgentState):
        """Atomic write: write to .tmp then rename (atomic on POSIX)."""
        state.last_checkpoint_ts = time.time()
        state.checkpoint_seq += 1

        data = asdict(state)
        payload = json.dumps(data, indent=2)
        checksum = hashlib.sha256(payload.encode()).hexdigest()
        wrapper = {"checksum": checksum, "state": data}

        tmp = self._tmp_path(state.agent_id)
        with open(tmp, "w") as f:
            json.dump(wrapper, f)
        os.replace(tmp, self._path(state.agent_id))  # atomic rename

        self.log.debug(f"Checkpoint saved (seq={state.checkpoint_seq})")

    def load(self, agent_id: str) -> Optional[AgentState]:
        path = self._path(agent_id)
        if not os.path.exists(path):
            return None

        with open(path) as f:
            wrapper = json.load(f)

        # Verify checksum
        data = wrapper["state"]
        expected = wrapper.get("checksum")
        payload = json.dumps(data, indent=2)
        actual = hashlib.sha256(payload.encode()).hexdigest()
        if expected != actual:
            self.log.error(f"Checkpoint checksum mismatch for {agent_id} — ignoring")
            return None

        self.log.info(
            f"Loaded checkpoint (agent={agent_id}, "
            f"seq={data['checkpoint_seq']}, "
            f"age={time.time()-data['last_checkpoint_ts']:.0f}s)"
        )
        return AgentState(**data)

class AutoCheckpointer:
    """Wraps an agent and saves checkpoint every N seconds."""

    def __init__(self, agent, checkpointer: Checkpointer, interval: int = 30):
        self.agent = agent
        self.cp = checkpointer
        self.interval = interval

    async def run_checkpoint_loop(self):
        while True:
            await asyncio.sleep(self.interval)
            try:
                state = self.agent.get_state()
                self.cp.save(state)
            except Exception as e:
                logging.getLogger("checkpoint").error(f"Checkpoint failed: {e}")

6. Health-Based Degradation Modes

Rather than a binary running/stopped state, a resilient agent operates in degraded modes when dependencies partially fail. Define explicit modes with clear capability restrictions:

ModeConditionCapabilitiesAction
FullAll systems nominalTrade, read, writeNormal operation
Read-OnlyWrite API degradedRead positions, no new ordersClose open positions only
ObservationPrice feed staleLog + monitor onlyNo trades; wait for feed recovery
ShutdownBalance below min OR drawdown exceededNothingClose all, alert, stop
degradation.pyPython
from enum import Enum
import time, logging, asyncio

class AgentMode(Enum):
    FULL = "full"
    READ_ONLY = "read_only"
    OBSERVATION = "observation"
    SHUTDOWN = "shutdown"

class DegradationManager:
    def __init__(self, agent_config):
        self.cfg = agent_config
        self._mode = AgentMode.FULL
        self._mode_since = time.time()
        self.log = logging.getLogger("degradation")

    @property
    def mode(self) -> AgentMode:
        return self._mode

    def transition(self, new_mode: AgentMode, reason: str):
        if new_mode == self._mode:
            return
        old = self._mode
        self._mode = new_mode
        self._mode_since = time.time()
        self.log.warning(
            f"Mode transition: {old.value} -> {new_mode.value} | reason: {reason}"
        )

    def can_trade(self) -> bool:
        return self._mode == AgentMode.FULL

    def can_read(self) -> bool:
        return self._mode in (AgentMode.FULL, AgentMode.READ_ONLY, AgentMode.OBSERVATION)

    def is_active(self) -> bool:
        return self._mode != AgentMode.SHUTDOWN

    async def assess(self, health: dict):
        """Called periodically with current health snapshot to determine mode."""
        if health.get("balance", 0) < self.cfg.min_balance:
            self.transition(AgentMode.SHUTDOWN, f"balance below minimum")
            return

        drawdown = health.get("drawdown_pct", 0)
        if drawdown > self.cfg.max_drawdown_pct * 100:
            self.transition(AgentMode.SHUTDOWN, f"drawdown {drawdown:.1f}% exceeded limit")
            return

        price_feed_age = health.get("price_feed_age_secs", 0)
        if price_feed_age > 60:
            self.transition(AgentMode.OBSERVATION,
                            f"price feed stale ({price_feed_age:.0f}s)")
            return

        api_ok = health.get("exchange_api_ok", True)
        if not api_ok:
            self.transition(AgentMode.READ_ONLY, "exchange write API degraded")
            return

        self.transition(AgentMode.FULL, "all systems nominal")

7. Chaos Engineering Tests

Chaos engineering verifies that your resilience patterns actually work by intentionally injecting failures in a controlled way. The discipline originated at Netflix (Chaos Monkey) and is now standard practice for critical systems.

Recommended chaos tests for trading agents:

chaos_tests.pyPython
import asyncio, aiohttp, random, logging, time
from unittest.mock import AsyncMock, patch
from typing import Callable, Any

class ChaosProxy:
    """
    HTTP proxy that injects failures for chaos engineering.
    Wrap your aiohttp.ClientSession with this in test mode.
    """

    def __init__(self, session: aiohttp.ClientSession,
                 failure_rate: float = 0.2,
                 latency_ms: float = 0,
                 error_codes: list = None):
        self.session = session
        self.failure_rate = failure_rate
        self.latency_ms = latency_ms
        self.error_codes = error_codes or [500, 503]
        self.log = logging.getLogger("chaos")
        self._injected = 0
        self._total = 0

    async def post(self, url: str, **kwargs) -> aiohttp.ClientResponse:
        return await self._inject(self.session.post, url, **kwargs)

    async def get(self, url: str, **kwargs) -> aiohttp.ClientResponse:
        return await self._inject(self.session.get, url, **kwargs)

    async def _inject(self, method, url: str, **kwargs):
        self._total += 1
        if self.latency_ms > 0:
            await asyncio.sleep(self.latency_ms / 1000)

        if random.random() < self.failure_rate:
            self._injected += 1
            code = random.choice(self.error_codes)
            self.log.warning(f"CHAOS: injecting {code} for {url}")
            raise aiohttp.ClientResponseError(
                request_info=None, history=None, status=code,
                message=f"Chaos-injected {code}"
            )
        return await method(url, **kwargs)

    @property
    def injection_rate(self) -> float:
        return self._injected / self._total if self._total > 0 else 0.0

async def run_chaos_test(agent_class, config, duration_secs: int = 60):
    """
    Run an agent under chaos conditions and verify it survives.
    """
    log = logging.getLogger("chaos_test")
    log.info(f"Starting chaos test for {duration_secs}s")

    async with aiohttp.ClientSession() as real_session:
        chaos_session = ChaosProxy(
            real_session,
            failure_rate=0.30,     # 30% of calls will fail
            latency_ms=500,         # 500ms extra latency on all calls
        )
        agent = agent_class(config, session=chaos_session)

        start = time.time()
        try:
            await asyncio.wait_for(agent.run(), timeout=duration_secs)
        except asyncio.TimeoutError:
            pass  # expected — test duration

    report = {
        "duration_secs": time.time() - start,
        "chaos_injections": chaos_session._injected,
        "total_calls": chaos_session._total,
        "actual_injection_rate": chaos_session.injection_rate,
        "agent_final_mode": agent.mode.value if hasattr(agent, "mode") else "unknown",
    }
    log.info(f"Chaos test complete: {report}")
    return report
Never run chaos tests against production systems. Use a staging environment with separate API keys, wallets funded with test tokens, and isolated infrastructure. Chaos tests that go wrong in production can cause real financial losses.

8. Combining Patterns: The Resilient Order Executor

In practice, you compose multiple resilience patterns together. Here is a production-grade order executor that combines the circuit breaker, exponential backoff, idempotency check, and DLQ — all in one class.

resilient_executor.pyPython
import asyncio, aiohttp, hashlib, json, time, logging
from typing import Optional

class ResilientOrderExecutor:
    """
    Places orders with: circuit breaker + retry + idempotency + DLQ.
    This is the pattern recommended for Purple Flea API integrations.
    """

    def __init__(self, api_key: str, api_base: str,
                 dlq: SQLiteDeadLetterQueue,
                 cb: CircuitBreaker):
        self.api_key = api_key
        self.api_base = api_base.rstrip("/")
        self.dlq = dlq
        self.cb = cb
        self.retry_cfg = RetryConfig(
            max_attempts=4,
            base_delay=2.0,
            max_delay=30.0,
            jitter=True,
            non_retryable_http_codes=(400, 401, 403, 404, 422)
        )
        self.log = logging.getLogger("order_executor")
        self._placed_ids: set = set()   # in-memory idempotency cache

    def _idempotency_key(self, order: dict) -> str:
        """Deterministic key from order params — same order always same key."""
        canonical = json.dumps(order, sort_keys=True)
        return hashlib.sha256(canonical.encode()).hexdigest()[:16]

    async def _place_once(self, session: aiohttp.ClientSession,
                          order: dict, idempotency_key: str) -> dict:
        async with session.post(
            f"{self.api_base}/api/trade",
            json=order,
            headers={
                "Authorization": f"Bearer {self.api_key}",
                "Idempotency-Key": idempotency_key
            },
            timeout=aiohttp.ClientTimeout(total=10)
        ) as r:
            r.raise_for_status()
            return await r.json()

    async def place_order(self, order: dict) -> Optional[dict]:
        ikey = self._idempotency_key(order)

        # Idempotency guard
        if ikey in self._placed_ids:
            self.log.warning(f"Order already placed (ikey={ikey}) — skipping duplicate")
            return None

        errors = []

        async def attempt_with_cb(session):
            return await self.cb.call(self._place_once, session, order, ikey)

        try:
            async with aiohttp.ClientSession() as session:
                result = await retry_async(
                    attempt_with_cb,
                    self.retry_cfg,
                    session
                )
            self._placed_ids.add(ikey)
            self.log.info(f"Order placed: {result.get('order_id')} (ikey={ikey})")
            return result

        except Exception as e:
            errors.append((time.time(), str(e)))
            self.log.error(f"Order failed after all retries: {e}")

            # Send to DLQ
            dl = DeadLetter(
                id=ikey,
                operation_type="place_order",
                payload=order,
                errors=errors,
                attempts=self.retry_cfg.max_attempts,
                first_attempted_at=time.time() - sum(
                    self.retry_cfg.delay(i) for i in range(self.retry_cfg.max_attempts)
                ),
                last_attempted_at=time.time()
            )
            self.dlq.enqueue(dl)
            return None

9. ResilienceAgent: Full Async Implementation

The complete ResilienceAgent wraps all the patterns above into a single runnable agent. It initializes all subsystems, recovers from checkpoint on startup, and runs the main trading loop with full resilience guarantees.

resilience_agent.pyPython
import asyncio, os, logging, time

logging.basicConfig(level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")

class ResilienceAgent:
    """
    Full-stack resilient trading agent integrating all patterns.
    Replace strategy_tick() with your signal logic.
    """

    def __init__(self, config):
        self.config = config
        self.log = logging.getLogger(f"agent.{config.agent_id}")

        # Resilience subsystems
        self.cb = CircuitBreaker("purpleflea", failure_threshold=5, recovery_timeout=60)
        self.dlq = SQLiteDeadLetterQueue(f"/var/lib/agent/{config.agent_id}_dlq.db")
        self.cp = Checkpointer("/var/lib/agent/checkpoints")
        self.degradation = DegradationManager(config)
        self.executor = ResilientOrderExecutor(
            api_key=config.api_key,
            api_base=config.api_base,
            dlq=self.dlq,
            cb=self.cb
        )

        # State
        self.state: AgentState = None
        self._running = True

    async def start(self):
        """Load checkpoint or initialize fresh state."""
        self.state = self.cp.load(self.config.agent_id)
        if self.state:
            self.log.info(
                f"Recovered from checkpoint (seq={self.state.checkpoint_seq}, "
                f"balance={self.state.balance:.2f})"
            )
        else:
            self.log.info("No checkpoint found — starting fresh")
            self.state = AgentState(
                agent_id=self.config.agent_id,
                balance=self.config.initial_balance,
                open_positions={},
                pending_orders={},
                pnl_history=[]
            )

        # Start checkpoint loop
        asyncio.create_task(
            AutoCheckpointer(self, self.cp, interval=30).run_checkpoint_loop()
        )

    def get_state(self) -> AgentState:
        return self.state

    async def strategy_tick(self) -> dict | None:
        """Override with your strategy. Return order dict or None."""
        # Example: always return None (do nothing)
        return None

    async def run(self):
        await self.start()
        self.log.info(f"Agent {self.config.agent_id} running")

        while self._running:
            try:
                health = {
                    "balance": self.state.balance,
                    "drawdown_pct": 0,   # compute from pnl_history
                    "price_feed_age_secs": 0,
                    "exchange_api_ok": self.cb.state.value != "open"
                }
                await self.degradation.assess(health)

                if not self.degradation.is_active():
                    self.log.warning("Agent in SHUTDOWN mode — stopping")
                    break

                if self.degradation.can_trade():
                    order = await self.strategy_tick()
                    if order:
                        result = await self.executor.place_order(order)
                        if result:
                            self.log.info(f"Trade executed: {result}")

                if self.dlq.size() > 10:
                    self.log.warning(f"DLQ has {self.dlq.size()} pending items!")

                await asyncio.sleep(self.config.tick_interval)

            except asyncio.CancelledError:
                break
            except Exception as e:
                self.log.error(f"Main loop error: {e}", exc_info=True)
                await asyncio.sleep(5)

        self.cp.save(self.state)
        self.log.info("Agent stopped cleanly. Final checkpoint saved.")

if __name__ == "__main__":
    from types import SimpleNamespace
    cfg = SimpleNamespace(
        agent_id=os.environ.get("AGENT_ID", "resilience-demo"),
        api_key=os.environ["PURPLEFLEA_API_KEY"],
        api_base="https://purpleflea.com",
        initial_balance=100.0,
        min_balance=10.0,
        max_drawdown_pct=0.20,
        tick_interval=10,
    )
    asyncio.run(ResilienceAgent(cfg).run())

Build resilient agents on Purple Flea

Purple Flea provides financial infrastructure designed for autonomous agents — casino, escrow, faucet, wallet, perpetuals, and domains. Start for free with the agent faucet.

Explore Purple Flea