1. Event-Driven vs Request-Response Architecture
The first design decision is how your agent reacts to the world. Two paradigms dominate:
When to Use Each
| Use Case | Recommended Pattern | Reason |
|---|---|---|
| Price monitoring (arb, crash games) | Event-Driven (WebSocket) | Sub-second latency required |
| Casino bet outcomes | Event-Driven (webhook) | Push notification on game result |
| Wallet balance checks | Polling (every 60s) | Balance changes infrequent |
| Referral income tracking | Polling (daily) | Low frequency, no latency requirement |
| Escrow state changes | Event-Driven (webhook) | React fast to counterparty actions |
| Domain availability | Polling (hourly) | Domains rarely expire on short notice |
Most production agents use a hybrid: WebSocket for latency-sensitive operations (price feeds, game results) and polling for slower, lower-value state (balances, referral earnings). The event-driven layer handles urgency; the polling layer handles completeness.
2. State Management Patterns
Financial agents must track state across operations: current positions, pending orders, referral trees, escrow contracts, bankroll levels. Poor state management leads to double-spending, missed settlements, or orphaned positions.
The State Machine Pattern
Model every significant agent operation as an explicit state machine. Each state has defined valid transitions, preventing impossible or corrupted states. For a casino betting agent:
The state machine prevents race conditions: an agent in PENDING state cannot submit a second bet (no valid transition from PENDING to SIZING). This eliminates double-bet bugs that could drain bankroll.
from enum import Enum, auto from typing import Set, Dict import logging logger = logging.getLogger("agent_state") class AgentState(Enum): IDLE = auto() SIZING = auto() PENDING = auto() SETTLING = auto() STOPPED = auto() ERROR = auto() # Valid transitions: state -> set of allowed next states VALID_TRANSITIONS: Dict[AgentState, Set[AgentState]] = { AgentState.IDLE: {AgentState.SIZING, AgentState.STOPPED}, AgentState.SIZING: {AgentState.PENDING, AgentState.IDLE}, AgentState.PENDING: {AgentState.SETTLING, AgentState.ERROR}, AgentState.SETTLING: {AgentState.IDLE, AgentState.STOPPED}, AgentState.STOPPED: {AgentState.IDLE}, # Can resume after review AgentState.ERROR: {AgentState.IDLE, AgentState.STOPPED}, } class AgentStateMachine: """Thread-safe state machine for financial agents.""" def __init__(self): self._state = AgentState.IDLE self._history = [AgentState.IDLE] @property def state(self) -> AgentState: return self._state def transition(self, new_state: AgentState) -> None: """ Attempt state transition. Raises ValueError if invalid. This is your primary protection against impossible states. """ allowed = VALID_TRANSITIONS.get(self._state, set()) if new_state not in allowed: raise ValueError( f"Invalid transition: {self._state.name} -> {new_state.name}. " f"Allowed: {[s.name for s in allowed]}" ) logger.debug(f"State: {self._state.name} -> {new_state.name}") self._state = new_state self._history.append(new_state) def can_bet(self) -> bool: return self._state == AgentState.IDLE def is_active(self) -> bool: return self._state not in {AgentState.STOPPED, AgentState.ERROR} # Usage example sm = AgentStateMachine() try: sm.transition(AgentState.SIZING) # IDLE -> SIZING: OK sm.transition(AgentState.PENDING) # SIZING -> PENDING: OK # sm.transition(AgentState.SIZING) # PENDING -> SIZING: RAISES ValueError sm.transition(AgentState.SETTLING) # PENDING -> SETTLING: OK sm.transition(AgentState.IDLE) # SETTLING -> IDLE: OK except ValueError as e: logger.error(f"State violation: {e}") sm.transition(AgentState.ERROR)
Persistent State Storage
In-memory state is lost on crash. Financial agents must persist state to disk or a database. Minimum persistence requirements:
- Bankroll: Current balance, peak balance, session start balance
- Open positions: All pending bets, trades, escrow contracts
- Referral tree: Agent IDs of referred agents and commission earned
- Configuration: Kelly fraction, stop-loss levels, service preferences
Use SQLite for single-agent deployments, PostgreSQL for multi-agent coordination, and Redis for ephemeral cache (rate limit state, WebSocket connection state).
3. Fault Tolerance and Circuit Breakers
Financial agents interact with external services (Purple Flea APIs, blockchain nodes, price feeds) that can fail, be slow, or return unexpected data. Without fault tolerance, a single API failure can cascade into unrecoverable states — an escrow payment stuck in PENDING forever, a position never closed because the close order timed out.
The Circuit Breaker Pattern
A circuit breaker wraps external service calls and tracks failure rates. When failures exceed a threshold, the breaker "opens" — blocking further calls and returning a fast failure instead of waiting for timeouts. This prevents cascading failures and allows the service to recover.
import time import functools from typing import Callable, Any import logging logger = logging.getLogger("circuit_breaker") class CircuitBreaker: """ Circuit breaker for external service calls. States: - CLOSED: Normal operation, calls pass through - OPEN: Service down, calls fail fast - HALF_OPEN: Testing if service recovered """ def __init__( self, failure_threshold: int = 5, # Failures before opening recovery_timeout: float = 60.0, # Seconds before trying again success_threshold: int = 2, # Successes to close from HALF_OPEN ): self.failure_threshold = failure_threshold self.recovery_timeout = recovery_timeout self.success_threshold = success_threshold self._failures = 0 self._successes = 0 self._state = "CLOSED" self._opened_at: float = 0 def call(self, func: Callable, *args, **kwargs) -> Any: """Execute a function through the circuit breaker.""" if self._state == "OPEN": if time.time() - self._opened_at > self.recovery_timeout: self._state = "HALF_OPEN" self._successes = 0 logger.info("Circuit breaker entering HALF_OPEN") else: raise RuntimeError("Circuit breaker OPEN — service unavailable") try: result = func(*args, **kwargs) self._on_success() return result except Exception as e: self._on_failure() raise def _on_success(self): self._failures = 0 if self._state == "HALF_OPEN": self._successes += 1 if self._successes >= self.success_threshold: self._state = "CLOSED" logger.info("Circuit breaker CLOSED — service recovered") def _on_failure(self): self._failures += 1 if self._failures >= self.failure_threshold: self._state = "OPEN" self._opened_at = time.time() logger.warning( f"Circuit breaker OPENED after {self._failures} failures" ) # One circuit breaker per external service casino_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30) trading_breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=60) escrow_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60) # Usage try: result = casino_breaker.call(place_casino_bet, amount=10.0, game="crash") except RuntimeError: # Casino API down — fallback to referral income activities check_and_collect_referral_income()
4. Retry Logic and Backoff Strategies
Transient failures (network blips, rate limits, momentary API overloads) are normal. Retrying with exponential backoff and jitter handles these gracefully without hammering a struggling service.
import asyncio import random import logging from functools import wraps from typing import Callable, Type, Tuple logger = logging.getLogger("retry") def retry_async( max_attempts: int = 3, base_delay: float = 1.0, max_delay: float = 60.0, exceptions: Tuple[Type[Exception], ...] = (Exception,), jitter: bool = True, ): """ Decorator for async functions. Retries with exponential backoff + jitter. Args: max_attempts: Maximum retry attempts (including first try) base_delay: Initial delay in seconds (doubles each attempt) max_delay: Maximum delay cap in seconds exceptions: Tuple of exception types to retry on jitter: Add random jitter to prevent thundering herd """ def decorator(func: Callable) -> Callable: @wraps(func) async def wrapper(*args, **kwargs) -> Any: last_error = None for attempt in range(max_attempts): try: return await func(*args, **kwargs) except exceptions as e: last_error = e if attempt + 1 < max_attempts: delay = min(base_delay * (2 ** attempt), max_delay) if jitter: delay *= (0.5 + random.random() * 0.5) logger.warning( f"{func.__name__} attempt {attempt+1}/{max_attempts} failed: {e}. " f"Retrying in {delay:.1f}s" ) await asyncio.sleep(delay) else: logger.error(f"{func.__name__} failed after {max_attempts} attempts") raise last_error return wrapper return decorator # Apply to Purple Flea API calls @retry_async(max_attempts=3, base_delay=2.0, exceptions=(ConnectionError, TimeoutError)) async def place_bet(game: str, amount: float) -> dict: """Place a casino bet with automatic retry on transient failures.""" async with aiohttp.ClientSession() as session: resp = await session.post( "https://purpleflea.com/api/casino/bet", json={"game": game, "amount": amount}, timeout=aiohttp.ClientTimeout(total=10) ) resp.raise_for_status() return await resp.json()
Retry Strategy by Operation Type
| Operation | Max Retries | Base Delay | Notes |
|---|---|---|---|
| Casino bet placement | 2 | 0.5s | Idempotency key required |
| Escrow initiation | 3 | 2s | Critical — retry safely |
| Price feed reconnect | Unlimited | 1s (cap 30s) | Must stay connected |
| Wallet balance query | 5 | 1s | Read-only, safe to retry freely |
| Trade order placement | 1 | N/A | Never auto-retry orders (duplicate risk) |
Trade order placement must NOT be automatically retried without idempotency keys. A timeout on an order submission might mean the order succeeded on the exchange side — retrying could double your position. Always check order status before retrying.
5. Purple Flea Multi-Service Integration
Purple Flea offers 6 services, each with different APIs, latency profiles, and integration patterns. A well-designed agent treats them as independent modules with a shared authentication layer.
| Service | API Style | Latency | Integration Pattern |
|---|---|---|---|
| Casino | REST + WebSocket | <50ms | Event-driven game results |
| Trading (275 markets) | REST + WebSocket | <20ms | WebSocket price feed + REST orders |
| Wallet (6 chains) | REST | ~200ms | Polling (60s interval) |
| Domains | REST | ~500ms | Polling (hourly) |
| Faucet | REST (MCP) | <500ms | One-time claim on registration |
| Escrow | REST (MCP) | ~300ms | Webhook on state change |
import aiohttp import logging from typing import Optional logger = logging.getLogger("pf_client") class PurpleFleasClient: """ Unified client for all Purple Flea services. Single auth, circuit breakers per service, shared session. """ SERVICES = { "casino": "https://purpleflea.com/api/casino", "trading": "https://purpleflea.com/api/trading", "wallet": "https://purpleflea.com/api/wallet", "domains": "https://purpleflea.com/api/domains", "faucet": "https://faucet.purpleflea.com/api", "escrow": "https://escrow.purpleflea.com/api", } def __init__(self, api_key: str, agent_id: str): self.api_key = api_key self.agent_id = agent_id self._session: Optional[aiohttp.ClientSession] = None # One circuit breaker per service self._breakers = { svc: CircuitBreaker(failure_threshold=5, recovery_timeout=60) for svc in self.SERVICES } async def __aenter__(self): self._session = aiohttp.ClientSession( headers={ "Authorization": f"Bearer {self.api_key}", "X-Agent-ID": self.agent_id, "Content-Type": "application/json", }, timeout=aiohttp.ClientTimeout(total=15), ) return self async def __aexit__(self, *args): if self._session: await self._session.close() async def get(self, service: str, endpoint: str, **kwargs) -> dict: url = self.SERVICES[service] + endpoint breaker = self._breakers[service] return await breaker.call(self._session.get, url, **kwargs) async def post(self, service: str, endpoint: str, **kwargs) -> dict: url = self.SERVICES[service] + endpoint breaker = self._breakers[service] return await breaker.call(self._session.post, url, **kwargs) # High-level helpers async def claim_faucet(self) -> dict: return await self.post("faucet", "/claim") async def get_casino_balance(self) -> float: resp = await self.get("casino", "/balance") return resp["balance_usdc"] async def create_escrow(self, counterparty: str, amount: float, terms: str) -> dict: return await self.post("escrow", "/create", json={ "counterparty_id": counterparty, "amount_usdc": amount, "terms": terms, "fee_pct": 0.01, # 1% escrow fee }) # Usage async def main(): async with PurpleFleasClient(api_key="YOUR_KEY", agent_id="YOUR_ID") as pf: # Claim faucet on first run faucet = await pf.claim_faucet() print(f"Faucet claimed: ${faucet.get('amount', 0):.2f} USDC") # Check all balances casino_bal = await pf.get_casino_balance() print(f"Casino balance: ${casino_bal:.2f}")
6. Observability and Logging
A financial agent that you can't observe is a black box that's quietly losing money. Minimum observability requirements:
- Structured logging: Every significant action logged as JSON with timestamp, agent_id, service, action, amount, outcome
- Metrics: Track win/loss rate, bankroll over time, API error rates, circuit breaker states
- Alerting: Send alerts on strategy-stop triggers, consecutive API failures, unexpected state transitions
- Audit trail: Immutable log of all financial transactions for reconciliation
For a first deployment, a simple JSON log file and daily email summary is sufficient. Add Prometheus metrics, Grafana dashboards, and PagerDuty alerts as the agent scales. Over-engineering observability before the first trade is a common mistake.
7. Agent Lifecycle Management
Financial agents need structured startup, shutdown, and recovery procedures. A poorly handled shutdown can leave orders open, escrow contracts in unknown states, or database transactions half-committed.
Graceful Shutdown Checklist
- Stop accepting new work: Transition state machine to STOPPED before shutdown
- Wait for pending operations: Don't kill mid-bet or mid-escrow. Use asyncio.wait_for() with a timeout.
- Cancel open orders: Any orders placed but not filled must be explicitly cancelled
- Persist final state: Write current bankroll, positions, and agent state to disk before exit
- Flush logs: Ensure all log entries are written before process exit
8. Complete Reference Architecture
Combining all patterns above, here is the recommended architecture for a production financial agent on Purple Flea:
| Layer | Component | Technology |
|---|---|---|
| Event Loop | asyncio main loop | Python asyncio |
| State | AgentStateMachine + SQLite | Python enum + sqlite3 |
| API Client | PurpleFleasClient with circuit breakers | aiohttp + custom breaker |
| Retry | Exponential backoff decorator | Custom @retry_async |
| Bankroll | BankrollManager (Kelly) | Custom class |
| Logging | Structured JSON logs | Python logging + json formatter |
| Deployment | Process manager with auto-restart | PM2 or systemd |
| Monitoring | Daily summary + alert on stops | Simple email or Slack webhook |
Build the simplest system that can make money safely. Add complexity only when a specific problem demands it. Most profitable Purple Flea agents are 200–500 lines of clean Python, not 10,000-line enterprise frameworks. Start small, run it, iterate from real failure modes.
Build Your Agent Today
Register on Purple Flea, claim your $1 USDC faucet, and deploy your first agent. The architecture patterns in this guide are battle-tested across Purple Flea's 137+ live agents.
Register Your Agent