Agent State Management at Scale
Stateless inference is easy. Stateful agents are hard. This guide covers every layer of the memory stack — from in-context working memory to vector databases — and shows concrete patterns for maintaining agent state across sessions, failures, and scale.
Why State Management Matters for Financial Agents
A casino agent that forgets its current bankroll between sessions will make catastrophic betting decisions. A trading agent that loses its position history cannot apply mean-reversion strategies. An escrow agent that drops pending transaction state mid-negotiation creates unresolvable disputes. Financial agents are not chatbots — correctness requires durable, queryable, versioned state.
The Purple Flea ecosystem compounds this challenge: agents interact with six services (Casino, Trading, Wallet, Domains, Faucet, Escrow), each generating distinct event streams. A production agent must reconcile state across all of them, handle partial failures, and resume coherently after restarts.
LLMs are stateless function calls. Every practical agent illusion of "memory" is your infrastructure, not the model. The model's context window is just the most expensive form of RAM you will ever rent.
The Four Memory Tiers
Agent memory is best understood as a hierarchy mirroring human cognitive architecture. Each tier has different cost, latency, capacity, and durability characteristics.
| Tier | Analogy | Storage | Latency | Capacity | Durability |
|---|---|---|---|---|---|
| Working | RAM | LLM context | ~0ms | ~200K tokens | Session only |
| Episodic | Short-term memory | Redis / in-memory | 1–5ms | Millions of records | Hours–days |
| Semantic | Long-term memory | Vector DB (Qdrant, Weaviate) | 5–50ms | Unlimited (indexed) | Permanent |
| Procedural | Muscle memory | Code / prompt templates | 0ms (compile-time) | Bounded by codebase | Permanent |
Working Memory — The Context Window
Working memory is what the model can "see" right now. It is fast, precise, and brutally limited. For financial agents the most important thing to put in working memory is the current portfolio snapshot, recent transaction history, and the current task objective.
import tiktoken
class ContextBudget:
"""Manage token budget across working memory slots."""
def __init__(self, model: str = "claude-3-7-sonnet", max_tokens: int = 180_000):
self.enc = tiktoken.encoding_for_model("gpt-4o") # approx for Claude
self.max_tokens = max_tokens
self.slots: dict[str, str] = {}
self.priorities: dict[str, int] = {} # higher = evict last
def set(self, key: str, content: str, priority: int = 5) -> None:
self.slots[key] = content
self.priorities[key] = priority
def count(self, text: str) -> int:
return len(self.enc.encode(text))
def total_tokens(self) -> int:
return sum(self.count(v) for v in self.slots.values())
def build_context(self, reserve_for_output: int = 4096) -> str:
budget = self.max_tokens - reserve_for_output
# Sort by priority descending, evict lowest priority first
sorted_slots = sorted(self.slots.items(), key=lambda x: -self.priorities[x[0]])
result_parts = []
used = 0
for key, content in sorted_slots:
tokens = self.count(content)
if used + tokens <= budget:
result_parts.append(f"=== {key.upper()} ===\n{content}")
used += tokens
else:
# Truncate if medium priority, skip if low
if self.priorities[key] >= 7:
remaining = budget - used
truncated = self.enc.decode(self.enc.encode(content)[:remaining])
result_parts.append(f"=== {key.upper()} (truncated) ===\n{truncated}")
used = budget
break
return "\n\n".join(result_parts)
Episodic Memory — Recent History
Episodic memory stores recent events: the last 100 trades, the last 50 chat turns, recent wallet transactions. Redis is the natural fit — low latency, TTL support, and sorted sets for time-ordered event logs.
import redis
import json
import time
from dataclasses import dataclass, asdict
from typing import Optional
@dataclass
class Episode:
agent_id: str
event_type: str # "trade", "bet", "transfer", "escrow_open"
timestamp: float
payload: dict
outcome: Optional[dict] = None
class EpisodicStore:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.r = redis.from_url(redis_url, decode_responses=True)
self.max_episodes = 500 # per agent
self.ttl_seconds = 86400 * 7 # 7 days
def record(self, episode: Episode) -> None:
key = f"episodes:{episode.agent_id}"
score = episode.timestamp
member = json.dumps(asdict(episode))
pipe = self.r.pipeline()
pipe.zadd(key, {member: score})
# Trim to max_episodes (keep newest)
pipe.zremrangebyrank(key, 0, -(self.max_episodes + 1))
pipe.expire(key, self.ttl_seconds)
pipe.execute()
def get_recent(self, agent_id: str, n: int = 50, event_type: Optional[str] = None) -> list[Episode]:
key = f"episodes:{agent_id}"
# newest first
raw = self.r.zrevrange(key, 0, n - 1, withscores=False)
episodes = [Episode(**json.loads(r)) for r in raw]
if event_type:
episodes = [e for e in episodes if e.event_type == event_type]
return episodes
def summarize_window(self, agent_id: str, since: float) -> dict:
key = f"episodes:{agent_id}"
raw = self.r.zrangebyscore(key, since, "+inf")
episodes = [Episode(**json.loads(r)) for r in raw]
return {
"total_events": len(episodes),
"by_type": {t: sum(1 for e in episodes if e.event_type == t)
for t in set(e.event_type for e in episodes)},
"time_span_hours": (time.time() - since) / 3600,
}
Semantic Memory — Vector Database
Semantic memory stores facts, strategies, and learned patterns indexed by meaning rather than by key. When an agent asks "what happened last time I tried aggressive betting on sports markets?", it is doing a semantic lookup, not a key-value fetch.
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
import openai
import hashlib
import time
class SemanticMemory:
COLLECTION = "agent_memories"
DIM = 1536 # text-embedding-3-small
def __init__(self, qdrant_url: str = "http://localhost:6333"):
self.qc = QdrantClient(url=qdrant_url)
self.oai = openai.OpenAI()
self._ensure_collection()
def _ensure_collection(self):
existing = [c.name for c in self.qc.get_collections().collections]
if self.COLLECTION not in existing:
self.qc.create_collection(
self.COLLECTION,
vectors_config=VectorParams(size=self.DIM, distance=Distance.COSINE)
)
def embed(self, text: str) -> list[float]:
resp = self.oai.embeddings.create(model="text-embedding-3-small", input=text)
return resp.data[0].embedding
def store(self, agent_id: str, content: str, metadata: dict = None) -> str:
uid = hashlib.sha256(f"{agent_id}:{content}:{time.time()}".encode()).hexdigest()[:16]
vec = self.embed(content)
payload = {"agent_id": agent_id, "content": content, "ts": time.time(), **(metadata or {})}
self.qc.upsert(self.COLLECTION, points=[PointStruct(id=uid, vector=vec, payload=payload)])
return uid
def recall(self, agent_id: str, query: str, top_k: int = 5) -> list[dict]:
vec = self.embed(query)
filt = Filter(must=[FieldCondition(key="agent_id", match=MatchValue(value=agent_id))])
hits = self.qc.search(self.COLLECTION, query_vector=vec, query_filter=filt, limit=top_k)
return [{"score": h.score, **h.payload} for h in hits]
Procedural Memory — Encoded Behavior
Procedural memory is behavior baked into code or system prompts. It does not need to be retrieved — it fires automatically. Examples: risk limits hardcoded as guard functions, stop-loss logic in the trading loop, the agent's negotiation style encoded in its system prompt.
from functools import wraps
from typing import Callable
def max_bet_guard(max_usdc: float = 100.0):
"""Procedural memory: never bet more than limit."""
def decorator(fn: Callable):
@wraps(fn)
def wrapper(agent, amount: float, *args, **kwargs):
if amount > max_usdc:
raise ValueError(f"Bet {amount} USDC exceeds procedural limit {max_usdc}")
return fn(agent, amount, *args, **kwargs)
return wrapper
return decorator
class CasinoAgent:
@max_bet_guard(max_usdc=50.0)
def place_bet(self, amount: float, game: str) -> dict:
# ... actual bet logic
pass
State Serialization Formats
Choosing the wrong serialization format causes silent data corruption, schema drift, and painful migrations. Financial state deserves the same rigor as financial code.
JSON — Simple but Fragile
JSON is readable and universal. Its weaknesses for agent state: no schema enforcement, no decimal type (floats corrupt financial amounts), no binary support, and no versioning convention.
import json
from decimal import Decimal
class DecimalEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Decimal):
return {"__decimal__": str(obj)}
return super().default(obj)
def decimal_hook(d: dict):
if "__decimal__" in d:
return Decimal(d["__decimal__"])
return d
# Always use these when storing financial amounts
state_json = json.dumps({"balance": Decimal("1234.56789")}, cls=DecimalEncoder)
state = json.loads(state_json, object_hook=decimal_hook)
assert state["balance"] == Decimal("1234.56789")
MessagePack — Compact Binary
MessagePack serializes 30–50% smaller than JSON with significantly faster parse times. Ideal for episodic events stored in Redis where you are writing thousands of records per minute.
import msgpack
from decimal import Decimal
def pack_state(state: dict) -> bytes:
# Convert Decimal to string before packing
def default(obj):
if isinstance(obj, Decimal):
return msgpack.ExtType(1, str(obj).encode())
raise TypeError(f"Unknown type: {type(obj)}")
return msgpack.packb(state, default=default, use_bin_type=True)
def unpack_state(data: bytes) -> dict:
def ext_hook(code, data):
if code == 1:
return Decimal(data.decode())
return msgpack.ExtType(code, data)
return msgpack.unpackb(data, ext_hook=ext_hook, raw=False)
Versioned Schema
Every state blob should carry a schema version. Without it, you cannot safely migrate historical state when your agent evolves.
from dataclasses import dataclass, field, asdict
from typing import Any
CURRENT_SCHEMA = 3
@dataclass
class AgentState:
schema_version: int = CURRENT_SCHEMA
agent_id: str = ""
portfolio: dict = field(default_factory=dict)
session_count: int = 0
preferences: dict = field(default_factory=dict)
@classmethod
def migrate(cls, raw: dict) -> "AgentState":
v = raw.get("schema_version", 1)
if v < 2:
# v1 had flat balance instead of portfolio dict
raw["portfolio"] = {"usdc": raw.pop("balance", 0)}
raw["schema_version"] = 2
if v < 3:
# v2 had no preferences
raw.setdefault("preferences", {})
raw["schema_version"] = 3
return cls(**{k: raw[k] for k in cls.__dataclass_fields__ if k in raw})
def to_dict(self) -> dict:
return asdict(self)
Redis Patterns for Agent State
Redis is the backbone of ephemeral-to-medium-term agent state. Its data structures map naturally to agent needs.
Hash for Agent Profile
import redis
import json
from decimal import Decimal
class AgentProfile:
def __init__(self, r: redis.Redis, agent_id: str):
self.r = r
self.key = f"agent:{agent_id}:profile"
def set_field(self, field: str, value) -> None:
self.r.hset(self.key, field, json.dumps(value, cls=DecimalEncoder))
def get_field(self, field: str):
raw = self.r.hget(self.key, field)
return json.loads(raw, object_hook=decimal_hook) if raw else None
def snapshot(self) -> dict:
raw = self.r.hgetall(self.key)
return {k: json.loads(v, object_hook=decimal_hook) for k, v in raw.items()}
def increment_session(self) -> int:
return self.r.hincrby(self.key, "session_count", 1)
Sorted Sets for Transaction Ledger
class TransactionLedger:
def __init__(self, r: redis.Redis, agent_id: str):
self.r = r
self.key = f"agent:{agent_id}:ledger"
def append(self, tx: dict) -> None:
import time
score = tx.get("timestamp", time.time())
member = json.dumps(tx, cls=DecimalEncoder, sort_keys=True)
self.r.zadd(self.key, {member: score})
def recent(self, n: int = 20) -> list[dict]:
raw = self.r.zrevrange(self.key, 0, n - 1)
return [json.loads(r, object_hook=decimal_hook) for r in raw]
def balance_from_ledger(self) -> Decimal:
"""Recompute USDC balance from full ledger (for reconciliation)."""
all_txs = [json.loads(r, object_hook=decimal_hook) for r in self.r.zrange(self.key, 0, -1)]
return sum(Decimal(str(tx.get("amount_usdc", 0))) for tx in all_txs)
Pub/Sub for Real-Time State Sync
import threading
class StateSync:
CHANNEL = "purpleflea:state:updates"
def __init__(self, r: redis.Redis):
self.r = r
self.pubsub = r.pubsub()
def publish_update(self, agent_id: str, field: str, value) -> None:
msg = json.dumps({"agent_id": agent_id, "field": field, "value": value})
self.r.publish(self.CHANNEL, msg)
def subscribe(self, handler) -> threading.Thread:
self.pubsub.subscribe(self.CHANNEL)
def listener():
for message in self.pubsub.listen():
if message["type"] == "message":
handler(json.loads(message["data"]))
t = threading.Thread(target=listener, daemon=True)
t.start()
return t
SQLite Patterns for Durable Agent State
SQLite is underrated for single-agent deployments. It offers ACID transactions, rich queries, WAL mode for concurrent readers, and zero infrastructure overhead. For an agent running on a single machine, SQLite outperforms most distributed databases for local state.
import sqlite3
import json
import time
from contextlib import contextmanager
from decimal import Decimal
from pathlib import Path
class AgentDB:
def __init__(self, db_path: str = "agent_state.db"):
self.db_path = db_path
self._init_schema()
@contextmanager
def conn(self):
con = sqlite3.connect(self.db_path, check_same_thread=False)
con.execute("PRAGMA journal_mode=WAL")
con.execute("PRAGMA synchronous=NORMAL")
con.row_factory = sqlite3.Row
try:
yield con
con.commit()
except Exception:
con.rollback()
raise
finally:
con.close()
def _init_schema(self):
with self.conn() as con:
con.executescript("""
CREATE TABLE IF NOT EXISTS agent_state (
agent_id TEXT PRIMARY KEY,
state_json TEXT NOT NULL,
schema_ver INTEGER DEFAULT 1,
updated_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS transactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_id TEXT NOT NULL,
event_type TEXT NOT NULL,
amount_usdc TEXT, -- stored as TEXT to preserve Decimal
payload TEXT,
created_at REAL NOT NULL,
FOREIGN KEY (agent_id) REFERENCES agent_state(agent_id)
);
CREATE INDEX IF NOT EXISTS idx_tx_agent ON transactions(agent_id, created_at DESC);
CREATE TABLE IF NOT EXISTS portfolio_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_id TEXT NOT NULL,
snapshot TEXT NOT NULL,
created_at REAL NOT NULL
);
""")
def save_state(self, agent_id: str, state: dict) -> None:
now = time.time()
with self.conn() as con:
con.execute("""
INSERT INTO agent_state(agent_id, state_json, schema_ver, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(agent_id) DO UPDATE SET
state_json = excluded.state_json,
schema_ver = excluded.schema_ver,
updated_at = excluded.updated_at
""", (agent_id, json.dumps(state, cls=DecimalEncoder), CURRENT_SCHEMA, now))
def load_state(self, agent_id: str) -> dict | None:
with self.conn() as con:
row = con.execute(
"SELECT state_json, schema_ver FROM agent_state WHERE agent_id = ?",
(agent_id,)
).fetchone()
if not row:
return None
return json.loads(row["state_json"], object_hook=decimal_hook)
def append_transaction(self, agent_id: str, event_type: str, amount_usdc: Decimal | None, payload: dict) -> int:
with self.conn() as con:
cur = con.execute("""
INSERT INTO transactions(agent_id, event_type, amount_usdc, payload, created_at)
VALUES (?, ?, ?, ?, ?)
""", (agent_id, event_type, str(amount_usdc) if amount_usdc else None, json.dumps(payload), time.time()))
return cur.lastrowid
Long-Term Memory with Vector Databases
Episodic and semantic memory diverge at scale. When an agent accumulates months of history, key-value lookup breaks down. The agent needs to ask questions like "what strategies worked in volatile markets?" or "what did I learn about game X in the casino?" Those are embedding-space queries.
When to use a vector DB vs. Redis
- Redis: you know the key, you want the value, freshness matters
- Vector DB: you have a natural language question, you need semantic similarity, you have 10K+ memories
Hybrid Retrieval Pipeline
from rank_bm25 import BM25Okapi
import numpy as np
class HybridMemory:
"""Combine keyword (BM25) and semantic (vector) retrieval for best recall."""
def __init__(self, semantic: SemanticMemory):
self.semantic = semantic
self._corpus: list[tuple[str, str]] = [] # (agent_id, content)
self._bm25: BM25Okapi | None = None
def index_text(self, agent_id: str, content: str) -> None:
self.semantic.store(agent_id, content)
self._corpus.append((agent_id, content))
tokenized = [doc[1].lower().split() for doc in self._corpus if doc[0] == agent_id]
self._bm25 = BM25Okapi(tokenized)
def search(self, agent_id: str, query: str, top_k: int = 5) -> list[dict]:
# Semantic results
sem_results = self.semantic.recall(agent_id, query, top_k=top_k * 2)
sem_map = {r["content"]: r["score"] for r in sem_results}
# Keyword results
if self._bm25:
agent_docs = [c for a, c in self._corpus if a == agent_id]
scores = self._bm25.get_scores(query.lower().split())
kw_map = {agent_docs[i]: float(scores[i]) for i in range(len(agent_docs))}
else:
kw_map = {}
# Normalize and merge (RRF — Reciprocal Rank Fusion)
all_docs = set(sem_map) | set(kw_map)
fused = {}
sem_ranked = sorted(sem_map, key=lambda x: -sem_map.get(x, 0))
kw_ranked = sorted(kw_map, key=lambda x: -kw_map.get(x, 0))
for rank, doc in enumerate(sem_ranked):
fused[doc] = fused.get(doc, 0) + 1 / (60 + rank + 1)
for rank, doc in enumerate(kw_ranked):
fused[doc] = fused.get(doc, 0) + 1 / (60 + rank + 1)
ranked = sorted(fused, key=lambda x: -fused[x])
return [{"content": doc, "rrf_score": fused[doc]} for doc in ranked[:top_k]]
Purple Flea Balance and Portfolio State Caching
Each Purple Flea service exposes balance/state endpoints. Polling them on every agent decision would exhaust rate limits and add unnecessary latency. The right pattern is a write-through cache with TTL-based invalidation.
import asyncio
import httpx
import time
from decimal import Decimal
from typing import Optional
class PurpleFleatStateCache:
"""
Cache Purple Flea service state with configurable TTLs.
Write-through on mutations; read-through on cache miss.
"""
BASE = "https://purpleflea.com/api/v1"
TTL = {
"wallet": 60, # seconds — wallet balance
"casino": 10, # fast-moving during active session
"trading": 30, # position state
"domains": 300, # rarely changes
"faucet": 120, # cooldown state
"escrow": 15, # escrow status changes on counterparty action
}
def __init__(self, api_key: str, redis_client=None):
self.api_key = api_key
self.r = redis_client # optional Redis backend; uses dict if None
self._local: dict = {} # fallback in-process cache
self.client = httpx.AsyncClient(headers={"Authorization": f"Bearer {api_key}"})
def _cache_key(self, agent_id: str, service: str) -> str:
return f"pf:state:{agent_id}:{service}"
async def _fetch(self, agent_id: str, service: str) -> dict:
url = f"{self.BASE}/{service}/state"
resp = await self.client.get(url, params={"agent_id": agent_id})
resp.raise_for_status()
return resp.json()
async def get(self, agent_id: str, service: str, force_refresh: bool = False) -> dict:
key = self._cache_key(agent_id, service)
ttl = self.TTL.get(service, 60)
if not force_refresh:
if self.r:
cached = self.r.get(key)
if cached:
return json.loads(cached, object_hook=decimal_hook)
else:
entry = self._local.get(key)
if entry and time.time() - entry["ts"] < ttl:
return entry["data"]
data = await self._fetch(agent_id, service)
if self.r:
self.r.setex(key, ttl, json.dumps(data, cls=DecimalEncoder))
else:
self._local[key] = {"ts": time.time(), "data": data}
return data
async def invalidate(self, agent_id: str, service: str) -> None:
key = self._cache_key(agent_id, service)
if self.r:
self.r.delete(key)
self._local.pop(key, None)
async def portfolio_snapshot(self, agent_id: str) -> dict:
"""Fetch all service states concurrently."""
services = ["wallet", "casino", "trading", "domains", "faucet", "escrow"]
tasks = [self.get(agent_id, s) for s in services]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {s: (r if not isinstance(r, Exception) else {"error": str(r)})
for s, r in zip(services, results)}
Purple Flea API keys use the pf_live_ prefix — for example pf_live_abc123xyz. Never use patterns like sk_live_ in any code you commit or share; only the pf_live_ prefix is valid here.
Conversation History Management
Naively appending every turn to context is the most common scaling failure. After a few hundred turns, context is full and the oldest — often most important — context is silently truncated.
import anthropic
from dataclasses import dataclass, field
@dataclass
class Turn:
role: str # "user" or "assistant"
content: str
class ConversationHistory:
"""
Maintain conversation history within a token budget.
Older turns are compressed into summaries.
"""
def __init__(self, max_tokens: int = 8_000, summary_at: int = 6_000):
self.max_tokens = max_tokens
self.summary_at = summary_at
self.turns: list[Turn] = []
self.summary: str = ""
self.client = anthropic.Anthropic()
def add(self, role: str, content: str) -> None:
self.turns.append(Turn(role=role, content=content))
if self._estimate_tokens() > self.summary_at:
self._compress()
def _estimate_tokens(self) -> int:
# Rough estimate: 4 chars ≈ 1 token
total = len(self.summary) // 4
for t in self.turns:
total += len(t.content) // 4
return total
def _compress(self) -> None:
"""Summarize oldest half of turns, keep newest half verbatim."""
split = len(self.turns) // 2
to_summarize = self.turns[:split]
self.turns = self.turns[split:]
history_text = "\n".join(f"{t.role}: {t.content}" for t in to_summarize)
existing = f"Previous summary: {self.summary}\n\n" if self.summary else ""
prompt = f"{existing}Summarize this conversation for an AI financial agent:\n\n{history_text}\n\nSummary:"
resp = self.client.messages.create(
model="claude-3-haiku-20240307",
max_tokens=500,
messages=[{"role": "user", "content": prompt}]
)
self.summary = resp.content[0].text
def to_messages(self) -> list[dict]:
messages = []
if self.summary:
messages.append({"role": "user", "content": f"[Session context: {self.summary}]"})
messages.append({"role": "assistant", "content": "Understood. Continuing with that context."})
for t in self.turns:
messages.append({"role": t.role, "content": t.content})
return messages
Checkpointing and Resume After Failure
Production agents crash. Network partitions happen. The agent must be able to resume from the last consistent state without replaying side-effecting operations (placing bets, sending transfers).
import uuid
import sqlite3
class CheckpointManager:
"""
Idempotency log + state checkpoint.
Each operation gets a stable idempotency key.
On resume, already-executed operations are skipped.
"""
def __init__(self, db: AgentDB, agent_id: str):
self.db = db
self.agent_id = agent_id
with self.db.conn() as con:
con.execute("""
CREATE TABLE IF NOT EXISTS idempotency_log (
idempotency_key TEXT PRIMARY KEY,
operation TEXT NOT NULL,
result_json TEXT,
executed_at REAL NOT NULL
)
""")
def operation_key(self, operation: str, params: dict) -> str:
import hashlib, json
payload = json.dumps({"op": operation, "params": params}, sort_keys=True)
return hashlib.sha256(payload.encode()).hexdigest()[:24]
def already_executed(self, key: str) -> dict | None:
with self.db.conn() as con:
row = con.execute(
"SELECT result_json FROM idempotency_log WHERE idempotency_key = ?", (key,)
).fetchone()
return json.loads(row["result_json"]) if row else None
def mark_executed(self, key: str, operation: str, result: dict) -> None:
with self.db.conn() as con:
con.execute("""
INSERT OR IGNORE INTO idempotency_log(idempotency_key, operation, result_json, executed_at)
VALUES (?, ?, ?, ?)
""", (key, operation, json.dumps(result), time.time()))
async def execute_once(self, operation: str, params: dict, fn) -> dict:
key = self.operation_key(operation, params)
cached = self.already_executed(key)
if cached:
return cached
result = await fn(**params)
self.mark_executed(key, operation, result)
return result
State Management Best Practices
- Never store API keys in agent state blobs. Use environment variables or a secrets manager. Rotate via your platform's secret management, not by editing state.
- Always version your state schema. Add a migration function for every schema change. Test migrations on production-shaped data before deploying.
- Prefer append-only logs over in-place mutation. A ledger of deltas is infinitely easier to audit and reconstruct than a mutable state object with no history.
- Set TTLs on everything in Redis. Stale state is worse than missing state for financial agents — the agent may act on incorrect balances.
- Checkpoint before external calls, commit after. If you bet, checkpoint the intent first. If you crash mid-call, the idempotency log lets you verify what happened on resume.
- Use Decimal, not float, for all financial amounts. Floating-point rounding errors compound across ledger entries and will eventually cause discrepancies that are extremely hard to debug.
- Test your resume logic with chaos injection. Kill the agent process at random points in your test suite and verify it resumes to a consistent state every time.
For Purple Flea's API reference, rate limits, and service endpoints, see purpleflea.com/docs. For the research paper on agent financial infrastructure, see the Zenodo publication.
Purple Flea Engineering — Blue chip financial infrastructure for AI agents. Six services: Casino, Trading, Wallet, Domains, Faucet, Escrow. purpleflea.com