Agent State Management at Scale

Stateless inference is easy. Stateful agents are hard. This guide covers every layer of the memory stack — from in-context working memory to vector databases — and shows concrete patterns for maintaining agent state across sessions, failures, and scale.

Why State Management Matters for Financial Agents

A casino agent that forgets its current bankroll between sessions will make catastrophic betting decisions. A trading agent that loses its position history cannot apply mean-reversion strategies. An escrow agent that drops pending transaction state mid-negotiation creates unresolvable disputes. Financial agents are not chatbots — correctness requires durable, queryable, versioned state.

The Purple Flea ecosystem compounds this challenge: agents interact with six services (Casino, Trading, Wallet, Domains, Faucet, Escrow), each generating distinct event streams. A production agent must reconcile state across all of them, handle partial failures, and resume coherently after restarts.

The Core Problem

LLMs are stateless function calls. Every practical agent illusion of "memory" is your infrastructure, not the model. The model's context window is just the most expensive form of RAM you will ever rent.

The Four Memory Tiers

Agent memory is best understood as a hierarchy mirroring human cognitive architecture. Each tier has different cost, latency, capacity, and durability characteristics.

Tier Analogy Storage Latency Capacity Durability
WorkingRAMLLM context~0ms~200K tokensSession only
EpisodicShort-term memoryRedis / in-memory1–5msMillions of recordsHours–days
SemanticLong-term memoryVector DB (Qdrant, Weaviate)5–50msUnlimited (indexed)Permanent
ProceduralMuscle memoryCode / prompt templates0ms (compile-time)Bounded by codebasePermanent

Working Memory — The Context Window

Working memory is what the model can "see" right now. It is fast, precise, and brutally limited. For financial agents the most important thing to put in working memory is the current portfolio snapshot, recent transaction history, and the current task objective.

Python — context budget manager
import tiktoken

class ContextBudget:
    """Manage token budget across working memory slots."""

    def __init__(self, model: str = "claude-3-7-sonnet", max_tokens: int = 180_000):
        self.enc = tiktoken.encoding_for_model("gpt-4o")  # approx for Claude
        self.max_tokens = max_tokens
        self.slots: dict[str, str] = {}
        self.priorities: dict[str, int] = {}  # higher = evict last

    def set(self, key: str, content: str, priority: int = 5) -> None:
        self.slots[key] = content
        self.priorities[key] = priority

    def count(self, text: str) -> int:
        return len(self.enc.encode(text))

    def total_tokens(self) -> int:
        return sum(self.count(v) for v in self.slots.values())

    def build_context(self, reserve_for_output: int = 4096) -> str:
        budget = self.max_tokens - reserve_for_output
        # Sort by priority descending, evict lowest priority first
        sorted_slots = sorted(self.slots.items(), key=lambda x: -self.priorities[x[0]])
        result_parts = []
        used = 0
        for key, content in sorted_slots:
            tokens = self.count(content)
            if used + tokens <= budget:
                result_parts.append(f"=== {key.upper()} ===\n{content}")
                used += tokens
            else:
                # Truncate if medium priority, skip if low
                if self.priorities[key] >= 7:
                    remaining = budget - used
                    truncated = self.enc.decode(self.enc.encode(content)[:remaining])
                    result_parts.append(f"=== {key.upper()} (truncated) ===\n{truncated}")
                    used = budget
                    break
        return "\n\n".join(result_parts)

Episodic Memory — Recent History

Episodic memory stores recent events: the last 100 trades, the last 50 chat turns, recent wallet transactions. Redis is the natural fit — low latency, TTL support, and sorted sets for time-ordered event logs.

Python — episodic store with Redis
import redis
import json
import time
from dataclasses import dataclass, asdict
from typing import Optional

@dataclass
class Episode:
    agent_id: str
    event_type: str       # "trade", "bet", "transfer", "escrow_open"
    timestamp: float
    payload: dict
    outcome: Optional[dict] = None

class EpisodicStore:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.r = redis.from_url(redis_url, decode_responses=True)
        self.max_episodes = 500   # per agent
        self.ttl_seconds = 86400 * 7  # 7 days

    def record(self, episode: Episode) -> None:
        key = f"episodes:{episode.agent_id}"
        score = episode.timestamp
        member = json.dumps(asdict(episode))
        pipe = self.r.pipeline()
        pipe.zadd(key, {member: score})
        # Trim to max_episodes (keep newest)
        pipe.zremrangebyrank(key, 0, -(self.max_episodes + 1))
        pipe.expire(key, self.ttl_seconds)
        pipe.execute()

    def get_recent(self, agent_id: str, n: int = 50, event_type: Optional[str] = None) -> list[Episode]:
        key = f"episodes:{agent_id}"
        # newest first
        raw = self.r.zrevrange(key, 0, n - 1, withscores=False)
        episodes = [Episode(**json.loads(r)) for r in raw]
        if event_type:
            episodes = [e for e in episodes if e.event_type == event_type]
        return episodes

    def summarize_window(self, agent_id: str, since: float) -> dict:
        key = f"episodes:{agent_id}"
        raw = self.r.zrangebyscore(key, since, "+inf")
        episodes = [Episode(**json.loads(r)) for r in raw]
        return {
            "total_events": len(episodes),
            "by_type": {t: sum(1 for e in episodes if e.event_type == t)
                        for t in set(e.event_type for e in episodes)},
            "time_span_hours": (time.time() - since) / 3600,
        }

Semantic Memory — Vector Database

Semantic memory stores facts, strategies, and learned patterns indexed by meaning rather than by key. When an agent asks "what happened last time I tried aggressive betting on sports markets?", it is doing a semantic lookup, not a key-value fetch.

Python — Qdrant semantic memory
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
import openai
import hashlib
import time

class SemanticMemory:
    COLLECTION = "agent_memories"
    DIM = 1536  # text-embedding-3-small

    def __init__(self, qdrant_url: str = "http://localhost:6333"):
        self.qc = QdrantClient(url=qdrant_url)
        self.oai = openai.OpenAI()
        self._ensure_collection()

    def _ensure_collection(self):
        existing = [c.name for c in self.qc.get_collections().collections]
        if self.COLLECTION not in existing:
            self.qc.create_collection(
                self.COLLECTION,
                vectors_config=VectorParams(size=self.DIM, distance=Distance.COSINE)
            )

    def embed(self, text: str) -> list[float]:
        resp = self.oai.embeddings.create(model="text-embedding-3-small", input=text)
        return resp.data[0].embedding

    def store(self, agent_id: str, content: str, metadata: dict = None) -> str:
        uid = hashlib.sha256(f"{agent_id}:{content}:{time.time()}".encode()).hexdigest()[:16]
        vec = self.embed(content)
        payload = {"agent_id": agent_id, "content": content, "ts": time.time(), **(metadata or {})}
        self.qc.upsert(self.COLLECTION, points=[PointStruct(id=uid, vector=vec, payload=payload)])
        return uid

    def recall(self, agent_id: str, query: str, top_k: int = 5) -> list[dict]:
        vec = self.embed(query)
        filt = Filter(must=[FieldCondition(key="agent_id", match=MatchValue(value=agent_id))])
        hits = self.qc.search(self.COLLECTION, query_vector=vec, query_filter=filt, limit=top_k)
        return [{"score": h.score, **h.payload} for h in hits]

Procedural Memory — Encoded Behavior

Procedural memory is behavior baked into code or system prompts. It does not need to be retrieved — it fires automatically. Examples: risk limits hardcoded as guard functions, stop-loss logic in the trading loop, the agent's negotiation style encoded in its system prompt.

Python — procedural guard example
from functools import wraps
from typing import Callable

def max_bet_guard(max_usdc: float = 100.0):
    """Procedural memory: never bet more than limit."""
    def decorator(fn: Callable):
        @wraps(fn)
        def wrapper(agent, amount: float, *args, **kwargs):
            if amount > max_usdc:
                raise ValueError(f"Bet {amount} USDC exceeds procedural limit {max_usdc}")
            return fn(agent, amount, *args, **kwargs)
        return wrapper
    return decorator

class CasinoAgent:
    @max_bet_guard(max_usdc=50.0)
    def place_bet(self, amount: float, game: str) -> dict:
        # ... actual bet logic
        pass

State Serialization Formats

Choosing the wrong serialization format causes silent data corruption, schema drift, and painful migrations. Financial state deserves the same rigor as financial code.

JSON — Simple but Fragile

JSON is readable and universal. Its weaknesses for agent state: no schema enforcement, no decimal type (floats corrupt financial amounts), no binary support, and no versioning convention.

Python — safe decimal JSON for financial state
import json
from decimal import Decimal

class DecimalEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, Decimal):
            return {"__decimal__": str(obj)}
        return super().default(obj)

def decimal_hook(d: dict):
    if "__decimal__" in d:
        return Decimal(d["__decimal__"])
    return d

# Always use these when storing financial amounts
state_json = json.dumps({"balance": Decimal("1234.56789")}, cls=DecimalEncoder)
state = json.loads(state_json, object_hook=decimal_hook)
assert state["balance"] == Decimal("1234.56789")

MessagePack — Compact Binary

MessagePack serializes 30–50% smaller than JSON with significantly faster parse times. Ideal for episodic events stored in Redis where you are writing thousands of records per minute.

Python — msgpack for Redis values
import msgpack
from decimal import Decimal

def pack_state(state: dict) -> bytes:
    # Convert Decimal to string before packing
    def default(obj):
        if isinstance(obj, Decimal):
            return msgpack.ExtType(1, str(obj).encode())
        raise TypeError(f"Unknown type: {type(obj)}")
    return msgpack.packb(state, default=default, use_bin_type=True)

def unpack_state(data: bytes) -> dict:
    def ext_hook(code, data):
        if code == 1:
            return Decimal(data.decode())
        return msgpack.ExtType(code, data)
    return msgpack.unpackb(data, ext_hook=ext_hook, raw=False)

Versioned Schema

Every state blob should carry a schema version. Without it, you cannot safely migrate historical state when your agent evolves.

Python — versioned state wrapper
from dataclasses import dataclass, field, asdict
from typing import Any

CURRENT_SCHEMA = 3

@dataclass
class AgentState:
    schema_version: int = CURRENT_SCHEMA
    agent_id: str = ""
    portfolio: dict = field(default_factory=dict)
    session_count: int = 0
    preferences: dict = field(default_factory=dict)

    @classmethod
    def migrate(cls, raw: dict) -> "AgentState":
        v = raw.get("schema_version", 1)
        if v < 2:
            # v1 had flat balance instead of portfolio dict
            raw["portfolio"] = {"usdc": raw.pop("balance", 0)}
            raw["schema_version"] = 2
        if v < 3:
            # v2 had no preferences
            raw.setdefault("preferences", {})
            raw["schema_version"] = 3
        return cls(**{k: raw[k] for k in cls.__dataclass_fields__ if k in raw})

    def to_dict(self) -> dict:
        return asdict(self)

Redis Patterns for Agent State

Redis is the backbone of ephemeral-to-medium-term agent state. Its data structures map naturally to agent needs.

Hash for Agent Profile

Python — Redis Hash as agent record
import redis
import json
from decimal import Decimal

class AgentProfile:
    def __init__(self, r: redis.Redis, agent_id: str):
        self.r = r
        self.key = f"agent:{agent_id}:profile"

    def set_field(self, field: str, value) -> None:
        self.r.hset(self.key, field, json.dumps(value, cls=DecimalEncoder))

    def get_field(self, field: str):
        raw = self.r.hget(self.key, field)
        return json.loads(raw, object_hook=decimal_hook) if raw else None

    def snapshot(self) -> dict:
        raw = self.r.hgetall(self.key)
        return {k: json.loads(v, object_hook=decimal_hook) for k, v in raw.items()}

    def increment_session(self) -> int:
        return self.r.hincrby(self.key, "session_count", 1)

Sorted Sets for Transaction Ledger

Python — Redis sorted set as time-ordered ledger
class TransactionLedger:
    def __init__(self, r: redis.Redis, agent_id: str):
        self.r = r
        self.key = f"agent:{agent_id}:ledger"

    def append(self, tx: dict) -> None:
        import time
        score = tx.get("timestamp", time.time())
        member = json.dumps(tx, cls=DecimalEncoder, sort_keys=True)
        self.r.zadd(self.key, {member: score})

    def recent(self, n: int = 20) -> list[dict]:
        raw = self.r.zrevrange(self.key, 0, n - 1)
        return [json.loads(r, object_hook=decimal_hook) for r in raw]

    def balance_from_ledger(self) -> Decimal:
        """Recompute USDC balance from full ledger (for reconciliation)."""
        all_txs = [json.loads(r, object_hook=decimal_hook) for r in self.r.zrange(self.key, 0, -1)]
        return sum(Decimal(str(tx.get("amount_usdc", 0))) for tx in all_txs)

Pub/Sub for Real-Time State Sync

Python — Redis pub/sub for multi-agent state broadcast
import threading

class StateSync:
    CHANNEL = "purpleflea:state:updates"

    def __init__(self, r: redis.Redis):
        self.r = r
        self.pubsub = r.pubsub()

    def publish_update(self, agent_id: str, field: str, value) -> None:
        msg = json.dumps({"agent_id": agent_id, "field": field, "value": value})
        self.r.publish(self.CHANNEL, msg)

    def subscribe(self, handler) -> threading.Thread:
        self.pubsub.subscribe(self.CHANNEL)
        def listener():
            for message in self.pubsub.listen():
                if message["type"] == "message":
                    handler(json.loads(message["data"]))
        t = threading.Thread(target=listener, daemon=True)
        t.start()
        return t

SQLite Patterns for Durable Agent State

SQLite is underrated for single-agent deployments. It offers ACID transactions, rich queries, WAL mode for concurrent readers, and zero infrastructure overhead. For an agent running on a single machine, SQLite outperforms most distributed databases for local state.

Python — SQLite agent state store
import sqlite3
import json
import time
from contextlib import contextmanager
from decimal import Decimal
from pathlib import Path

class AgentDB:
    def __init__(self, db_path: str = "agent_state.db"):
        self.db_path = db_path
        self._init_schema()

    @contextmanager
    def conn(self):
        con = sqlite3.connect(self.db_path, check_same_thread=False)
        con.execute("PRAGMA journal_mode=WAL")
        con.execute("PRAGMA synchronous=NORMAL")
        con.row_factory = sqlite3.Row
        try:
            yield con
            con.commit()
        except Exception:
            con.rollback()
            raise
        finally:
            con.close()

    def _init_schema(self):
        with self.conn() as con:
            con.executescript("""
                CREATE TABLE IF NOT EXISTS agent_state (
                    agent_id    TEXT PRIMARY KEY,
                    state_json  TEXT NOT NULL,
                    schema_ver  INTEGER DEFAULT 1,
                    updated_at  REAL NOT NULL
                );

                CREATE TABLE IF NOT EXISTS transactions (
                    id          INTEGER PRIMARY KEY AUTOINCREMENT,
                    agent_id    TEXT NOT NULL,
                    event_type  TEXT NOT NULL,
                    amount_usdc TEXT,          -- stored as TEXT to preserve Decimal
                    payload     TEXT,
                    created_at  REAL NOT NULL,
                    FOREIGN KEY (agent_id) REFERENCES agent_state(agent_id)
                );
                CREATE INDEX IF NOT EXISTS idx_tx_agent ON transactions(agent_id, created_at DESC);

                CREATE TABLE IF NOT EXISTS portfolio_snapshots (
                    id          INTEGER PRIMARY KEY AUTOINCREMENT,
                    agent_id    TEXT NOT NULL,
                    snapshot    TEXT NOT NULL,
                    created_at  REAL NOT NULL
                );
            """)

    def save_state(self, agent_id: str, state: dict) -> None:
        now = time.time()
        with self.conn() as con:
            con.execute("""
                INSERT INTO agent_state(agent_id, state_json, schema_ver, updated_at)
                VALUES (?, ?, ?, ?)
                ON CONFLICT(agent_id) DO UPDATE SET
                    state_json = excluded.state_json,
                    schema_ver = excluded.schema_ver,
                    updated_at = excluded.updated_at
            """, (agent_id, json.dumps(state, cls=DecimalEncoder), CURRENT_SCHEMA, now))

    def load_state(self, agent_id: str) -> dict | None:
        with self.conn() as con:
            row = con.execute(
                "SELECT state_json, schema_ver FROM agent_state WHERE agent_id = ?",
                (agent_id,)
            ).fetchone()
        if not row:
            return None
        return json.loads(row["state_json"], object_hook=decimal_hook)

    def append_transaction(self, agent_id: str, event_type: str, amount_usdc: Decimal | None, payload: dict) -> int:
        with self.conn() as con:
            cur = con.execute("""
                INSERT INTO transactions(agent_id, event_type, amount_usdc, payload, created_at)
                VALUES (?, ?, ?, ?, ?)
            """, (agent_id, event_type, str(amount_usdc) if amount_usdc else None, json.dumps(payload), time.time()))
            return cur.lastrowid

Long-Term Memory with Vector Databases

Episodic and semantic memory diverge at scale. When an agent accumulates months of history, key-value lookup breaks down. The agent needs to ask questions like "what strategies worked in volatile markets?" or "what did I learn about game X in the casino?" Those are embedding-space queries.

When to use a vector DB vs. Redis

  • Redis: you know the key, you want the value, freshness matters
  • Vector DB: you have a natural language question, you need semantic similarity, you have 10K+ memories

Hybrid Retrieval Pipeline

Python — hybrid BM25 + vector retrieval
from rank_bm25 import BM25Okapi
import numpy as np

class HybridMemory:
    """Combine keyword (BM25) and semantic (vector) retrieval for best recall."""

    def __init__(self, semantic: SemanticMemory):
        self.semantic = semantic
        self._corpus: list[tuple[str, str]] = []   # (agent_id, content)
        self._bm25: BM25Okapi | None = None

    def index_text(self, agent_id: str, content: str) -> None:
        self.semantic.store(agent_id, content)
        self._corpus.append((agent_id, content))
        tokenized = [doc[1].lower().split() for doc in self._corpus if doc[0] == agent_id]
        self._bm25 = BM25Okapi(tokenized)

    def search(self, agent_id: str, query: str, top_k: int = 5) -> list[dict]:
        # Semantic results
        sem_results = self.semantic.recall(agent_id, query, top_k=top_k * 2)
        sem_map = {r["content"]: r["score"] for r in sem_results}

        # Keyword results
        if self._bm25:
            agent_docs = [c for a, c in self._corpus if a == agent_id]
            scores = self._bm25.get_scores(query.lower().split())
            kw_map = {agent_docs[i]: float(scores[i]) for i in range(len(agent_docs))}
        else:
            kw_map = {}

        # Normalize and merge (RRF — Reciprocal Rank Fusion)
        all_docs = set(sem_map) | set(kw_map)
        fused = {}
        sem_ranked = sorted(sem_map, key=lambda x: -sem_map.get(x, 0))
        kw_ranked  = sorted(kw_map,  key=lambda x: -kw_map.get(x, 0))
        for rank, doc in enumerate(sem_ranked):
            fused[doc] = fused.get(doc, 0) + 1 / (60 + rank + 1)
        for rank, doc in enumerate(kw_ranked):
            fused[doc] = fused.get(doc, 0) + 1 / (60 + rank + 1)

        ranked = sorted(fused, key=lambda x: -fused[x])
        return [{"content": doc, "rrf_score": fused[doc]} for doc in ranked[:top_k]]

Purple Flea Balance and Portfolio State Caching

Each Purple Flea service exposes balance/state endpoints. Polling them on every agent decision would exhaust rate limits and add unnecessary latency. The right pattern is a write-through cache with TTL-based invalidation.

Python — Purple Flea multi-service state cache
import asyncio
import httpx
import time
from decimal import Decimal
from typing import Optional

class PurpleFleatStateCache:
    """
    Cache Purple Flea service state with configurable TTLs.
    Write-through on mutations; read-through on cache miss.
    """
    BASE = "https://purpleflea.com/api/v1"

    TTL = {
        "wallet":  60,     # seconds — wallet balance
        "casino":  10,     # fast-moving during active session
        "trading": 30,     # position state
        "domains":  300,   # rarely changes
        "faucet":  120,    # cooldown state
        "escrow":  15,     # escrow status changes on counterparty action
    }

    def __init__(self, api_key: str, redis_client=None):
        self.api_key = api_key
        self.r = redis_client   # optional Redis backend; uses dict if None
        self._local: dict = {}  # fallback in-process cache
        self.client = httpx.AsyncClient(headers={"Authorization": f"Bearer {api_key}"})

    def _cache_key(self, agent_id: str, service: str) -> str:
        return f"pf:state:{agent_id}:{service}"

    async def _fetch(self, agent_id: str, service: str) -> dict:
        url = f"{self.BASE}/{service}/state"
        resp = await self.client.get(url, params={"agent_id": agent_id})
        resp.raise_for_status()
        return resp.json()

    async def get(self, agent_id: str, service: str, force_refresh: bool = False) -> dict:
        key = self._cache_key(agent_id, service)
        ttl = self.TTL.get(service, 60)

        if not force_refresh:
            if self.r:
                cached = self.r.get(key)
                if cached:
                    return json.loads(cached, object_hook=decimal_hook)
            else:
                entry = self._local.get(key)
                if entry and time.time() - entry["ts"] < ttl:
                    return entry["data"]

        data = await self._fetch(agent_id, service)
        if self.r:
            self.r.setex(key, ttl, json.dumps(data, cls=DecimalEncoder))
        else:
            self._local[key] = {"ts": time.time(), "data": data}
        return data

    async def invalidate(self, agent_id: str, service: str) -> None:
        key = self._cache_key(agent_id, service)
        if self.r:
            self.r.delete(key)
        self._local.pop(key, None)

    async def portfolio_snapshot(self, agent_id: str) -> dict:
        """Fetch all service states concurrently."""
        services = ["wallet", "casino", "trading", "domains", "faucet", "escrow"]
        tasks = [self.get(agent_id, s) for s in services]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return {s: (r if not isinstance(r, Exception) else {"error": str(r)})
                for s, r in zip(services, results)}
API Key Format

Purple Flea API keys use the pf_live_ prefix — for example pf_live_abc123xyz. Never use patterns like sk_live_ in any code you commit or share; only the pf_live_ prefix is valid here.

Conversation History Management

Naively appending every turn to context is the most common scaling failure. After a few hundred turns, context is full and the oldest — often most important — context is silently truncated.

Python — sliding window + summary compression
import anthropic
from dataclasses import dataclass, field

@dataclass
class Turn:
    role: str   # "user" or "assistant"
    content: str

class ConversationHistory:
    """
    Maintain conversation history within a token budget.
    Older turns are compressed into summaries.
    """
    def __init__(self, max_tokens: int = 8_000, summary_at: int = 6_000):
        self.max_tokens = max_tokens
        self.summary_at = summary_at
        self.turns: list[Turn] = []
        self.summary: str = ""
        self.client = anthropic.Anthropic()

    def add(self, role: str, content: str) -> None:
        self.turns.append(Turn(role=role, content=content))
        if self._estimate_tokens() > self.summary_at:
            self._compress()

    def _estimate_tokens(self) -> int:
        # Rough estimate: 4 chars ≈ 1 token
        total = len(self.summary) // 4
        for t in self.turns:
            total += len(t.content) // 4
        return total

    def _compress(self) -> None:
        """Summarize oldest half of turns, keep newest half verbatim."""
        split = len(self.turns) // 2
        to_summarize = self.turns[:split]
        self.turns = self.turns[split:]

        history_text = "\n".join(f"{t.role}: {t.content}" for t in to_summarize)
        existing = f"Previous summary: {self.summary}\n\n" if self.summary else ""
        prompt = f"{existing}Summarize this conversation for an AI financial agent:\n\n{history_text}\n\nSummary:"
        resp = self.client.messages.create(
            model="claude-3-haiku-20240307",
            max_tokens=500,
            messages=[{"role": "user", "content": prompt}]
        )
        self.summary = resp.content[0].text

    def to_messages(self) -> list[dict]:
        messages = []
        if self.summary:
            messages.append({"role": "user", "content": f"[Session context: {self.summary}]"})
            messages.append({"role": "assistant", "content": "Understood. Continuing with that context."})
        for t in self.turns:
            messages.append({"role": t.role, "content": t.content})
        return messages

Checkpointing and Resume After Failure

Production agents crash. Network partitions happen. The agent must be able to resume from the last consistent state without replaying side-effecting operations (placing bets, sending transfers).

Python — idempotent operation log with checkpoint
import uuid
import sqlite3

class CheckpointManager:
    """
    Idempotency log + state checkpoint.
    Each operation gets a stable idempotency key.
    On resume, already-executed operations are skipped.
    """

    def __init__(self, db: AgentDB, agent_id: str):
        self.db = db
        self.agent_id = agent_id
        with self.db.conn() as con:
            con.execute("""
                CREATE TABLE IF NOT EXISTS idempotency_log (
                    idempotency_key TEXT PRIMARY KEY,
                    operation       TEXT NOT NULL,
                    result_json     TEXT,
                    executed_at     REAL NOT NULL
                )
            """)

    def operation_key(self, operation: str, params: dict) -> str:
        import hashlib, json
        payload = json.dumps({"op": operation, "params": params}, sort_keys=True)
        return hashlib.sha256(payload.encode()).hexdigest()[:24]

    def already_executed(self, key: str) -> dict | None:
        with self.db.conn() as con:
            row = con.execute(
                "SELECT result_json FROM idempotency_log WHERE idempotency_key = ?", (key,)
            ).fetchone()
        return json.loads(row["result_json"]) if row else None

    def mark_executed(self, key: str, operation: str, result: dict) -> None:
        with self.db.conn() as con:
            con.execute("""
                INSERT OR IGNORE INTO idempotency_log(idempotency_key, operation, result_json, executed_at)
                VALUES (?, ?, ?, ?)
            """, (key, operation, json.dumps(result), time.time()))

    async def execute_once(self, operation: str, params: dict, fn) -> dict:
        key = self.operation_key(operation, params)
        cached = self.already_executed(key)
        if cached:
            return cached
        result = await fn(**params)
        self.mark_executed(key, operation, result)
        return result

State Management Best Practices

Further Reading

For Purple Flea's API reference, rate limits, and service endpoints, see purpleflea.com/docs. For the research paper on agent financial infrastructure, see the Zenodo publication.


Purple Flea Engineering — Blue chip financial infrastructure for AI agents. Six services: Casino, Trading, Wallet, Domains, Faucet, Escrow. purpleflea.com