Event Sourcing for AI Agent State
The Problem with CRUD for Agent State
Most developers building AI agents reach for the same pattern they use for web applications: a database table with a current-state row. The agent reads its balance, makes a trade, and updates the balance field. Simple, familiar โ and deeply problematic for autonomous agents operating in financial systems.
CRUD (Create, Read, Update, Delete) state management destroys historical context. When an agent's balance moves from 1,000 to 750, the intermediate history vanishes. You know the current state but not how you got there. For AI agents that need to audit their decisions, replay scenarios, or recover from failures, this is catastrophic.
Event sourcing inverts this model. Instead of storing current state, you store a log of every event that has ever happened. The current state is derived by replaying those events. This gives you something CRUD can never offer: a complete, immutable audit trail of every decision an agent has made.
| Dimension | CRUD State | Event Sourcing |
|---|---|---|
| Storage model | Current state only | Append-only event log |
| History | โ Lost on update | โ Full audit trail |
| Temporal queries | โ Impossible | โ Replay to any point |
| Failure recovery | From last backup | From event log (exact) |
| Concurrent writes | Lock/retry needed | Append-only, no contention |
| Debugging | Hard โ state is opaque | Easy โ trace every event |
| New projections | Requires schema migration | Replay events into new model |
| Complexity | Low initial | Higher initial, lower long-term |
Core Concepts: Commands, Events, Projections
Event sourcing has three foundational concepts that work together in a pipeline:
Commands
A command is an instruction to do something. It represents intent, not fact. Commands can be rejected. Examples: PlaceBet, WithdrawFunds, RegisterAgent. Commands are validated before being accepted, and if the system cannot fulfill them, they fail without producing any events.
Events
An event is a fact that has already happened. Events are immutable and always past tense: BetPlaced, FundsWithdrawn, AgentRegistered. Once an event is written to the event store, it never changes. This immutability is what makes the entire pattern trustworthy.
Projections
A projection (also called a read model or view model) is the current state derived by folding events. Your agent's balance is a projection: start at zero, apply each deposit event, subtract each withdrawal event, and you have the current balance. Projections can be rebuilt at any time by replaying events.
Event Store Design
An event store needs to satisfy three properties: it must be append-only (no updates or deletes), events must be ordered within an aggregate, and reads must support both sequential replay and random access by aggregate ID.
For AI agents, SQLite is an excellent starting point. It is embedded, zero-infrastructure, and supports WAL mode for concurrent reads. Here is a production-ready schema:
-- Event store schema for AI agent state CREATE TABLE events ( id INTEGER PRIMARY KEY AUTOINCREMENT, event_id TEXT NOT NULL UNIQUE, -- UUID stream_id TEXT NOT NULL, -- aggregate identity stream_type TEXT NOT NULL, -- 'agent' | 'portfolio' | 'session' event_type TEXT NOT NULL, -- 'BetPlaced' | 'FundsDeposited' etc. version INTEGER NOT NULL, -- monotonic per stream_id data TEXT NOT NULL, -- JSON payload metadata TEXT NOT NULL DEFAULT '{}', -- causation_id, correlation_id, etc. occurred_at TEXT NOT NULL, -- ISO8601 timestamp UNIQUE (stream_id, version) -- optimistic concurrency ); CREATE INDEX idx_stream ON events (stream_id, version); CREATE INDEX idx_type ON events (event_type, occurred_at); CREATE INDEX idx_global ON events (id); -- global ordering for projections -- Snapshots table (optimization for long-lived aggregates) CREATE TABLE snapshots ( stream_id TEXT NOT NULL PRIMARY KEY, version INTEGER NOT NULL, data TEXT NOT NULL, taken_at TEXT NOT NULL );
The UNIQUE (stream_id, version) constraint is your optimistic concurrency guard. If two writers try to append the same version to the same stream simultaneously, one will fail with a constraint error โ forcing a retry with the latest state.
Python EventStore Implementation
Here is a complete, production-quality Python EventStore class built on SQLite, with support for append, replay, snapshots, and version checking:
import sqlite3, json, uuid from datetime import datetime, timezone from dataclasses import dataclass, field, asdict from typing import List, Optional, Dict, Any, Iterator from contextlib import contextmanager @dataclass class Event: stream_id: str stream_type: str event_type: str data: Dict[str, Any] event_id: str = field(default_factory=lambda: str(uuid.uuid4())) version: int = 0 metadata: Dict[str, Any] = field(default_factory=dict) occurred_at: str = field( default_factory=lambda: datetime.now(timezone.utc).isoformat() ) @dataclass class Snapshot: stream_id: str version: int data: Dict[str, Any] taken_at: str = field( default_factory=lambda: datetime.now(timezone.utc).isoformat() ) class OptimisticConcurrencyError(Exception): pass class EventStore: """ Append-only SQLite-backed event store for AI agent state. Usage: store = EventStore("agent_state.db") store.append("agent-001", "agent", [ Event("agent-001", "agent", "BetPlaced", {"amount": 100, "game": "blackjack"}) ], expected_version=0) """ def __init__(self, db_path: str): self.db_path = db_path self._init_db() def _init_db(self): with self._conn() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS events ( id INTEGER PRIMARY KEY AUTOINCREMENT, event_id TEXT NOT NULL UNIQUE, stream_id TEXT NOT NULL, stream_type TEXT NOT NULL, event_type TEXT NOT NULL, version INTEGER NOT NULL, data TEXT NOT NULL, metadata TEXT NOT NULL DEFAULT '{}', occurred_at TEXT NOT NULL, UNIQUE (stream_id, version) ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS snapshots ( stream_id TEXT NOT NULL PRIMARY KEY, version INTEGER NOT NULL, data TEXT NOT NULL, taken_at TEXT NOT NULL ) """) conn.execute("PRAGMA journal_mode=WAL") conn.execute("CREATE INDEX IF NOT EXISTS idx_stream ON events (stream_id, version)") @contextmanager def _conn(self): conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row try: yield conn conn.commit() except: conn.rollback() raise finally: conn.close() def append( self, stream_id: str, stream_type: str, events: List[Event], expected_version: int = -1, ) -> int: """Append events to a stream with optimistic concurrency check.""" with self._conn() as conn: row = conn.execute( "SELECT MAX(version) FROM events WHERE stream_id = ?", (stream_id,) ).fetchone() current_version = row[0] if row[0] is not None else -1 if expected_version != -1 and current_version != expected_version: raise OptimisticConcurrencyError( f"Stream {stream_id}: expected v{expected_version}, got v{current_version}" ) next_version = current_version + 1 for i, event in enumerate(events): event.version = next_version + i try: conn.execute( """INSERT INTO events (event_id, stream_id, stream_type, event_type, version, data, metadata, occurred_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", ( event.event_id, stream_id, stream_type, event.event_type, event.version, json.dumps(event.data), json.dumps(event.metadata), event.occurred_at, ) ) except sqlite3.IntegrityError as e: raise OptimisticConcurrencyError(str(e)) from e return next_version + len(events) - 1 def load_stream( self, stream_id: str, from_version: int = 0, to_version: Optional[int] = None ) -> Iterator[Event]: """Replay all events for an aggregate from a given version.""" query = "SELECT * FROM events WHERE stream_id = ? AND version >= ?" params: list = [stream_id, from_version] if to_version is not None: query += " AND version <= ?" params.append(to_version) query += " ORDER BY version ASC" with self._conn() as conn: for row in conn.execute(query, params): yield Event( stream_id=row["stream_id"], stream_type=row["stream_type"], event_type=row["event_type"], data=json.loads(row["data"]), event_id=row["event_id"], version=row["version"], metadata=json.loads(row["metadata"]), occurred_at=row["occurred_at"], ) def save_snapshot(self, snapshot: Snapshot): with self._conn() as conn: conn.execute( """INSERT OR REPLACE INTO snapshots (stream_id, version, data, taken_at) VALUES (?, ?, ?, ?)""", (snapshot.stream_id, snapshot.version, json.dumps(snapshot.data), snapshot.taken_at) ) def load_snapshot(self, stream_id: str) -> Optional[Snapshot]: with self._conn() as conn: row = conn.execute( "SELECT * FROM snapshots WHERE stream_id = ?", (stream_id,) ).fetchone() if row is None: return None return Snapshot( stream_id=row["stream_id"], version=row["version"], data=json.loads(row["data"]), taken_at=row["taken_at"], ) def all_events(self, from_global_id: int = 0) -> Iterator[Event]: """Global event log ordered by insertion โ used by projections.""" with self._conn() as conn: for row in conn.execute( "SELECT * FROM events WHERE id > ? ORDER BY id ASC", (from_global_id,) ): yield Event( stream_id=row["stream_id"], stream_type=row["stream_type"], event_type=row["event_type"], data=json.loads(row["data"]), event_id=row["event_id"], version=row["version"], metadata=json.loads(row["metadata"]), occurred_at=row["occurred_at"], )
Projections: Portfolio, Balance, P&L
Projections are pure functions over event streams. They fold events into a view that serves read queries. Here are three projections every financial agent needs:
from collections import defaultdict class BalanceProjection: """Current balance derived from deposit/withdrawal events.""" def __init__(self): self.balance = 0.0 self.last_version = -1 def apply(self, event: Event): t = event.event_type if t == "FundsDeposited": self.balance += event.data["amount"] elif t in ("FundsWithdrawn", "BetPlaced", "EscrowLocked"): self.balance -= event.data["amount"] elif t in ("BetWon", "EscrowReleased", "FaucetClaimed"): self.balance += event.data["amount"] elif t == "FeeCharged": self.balance -= event.data["fee"] self.last_version = event.version class PnLProjection: """Rolling profit and loss across all gambling sessions.""" def __init__(self): self.total_wagered = 0.0 self.total_won = 0.0 self.total_fees = 0.0 self.bet_count = 0 self.win_count = 0 self.last_version = -1 @property def net_pnl(self) -> float: return self.total_won - self.total_wagered - self.total_fees @property def win_rate(self) -> float: return self.win_count / self.bet_count if self.bet_count > 0 else 0.0 @property def roi(self) -> float: if self.total_wagered == 0: return 0.0 return self.net_pnl / self.total_wagered def apply(self, event: Event): if event.event_type == "BetPlaced": self.total_wagered += event.data["amount"] self.bet_count += 1 elif event.event_type == "BetWon": self.total_won += event.data["amount"] self.win_count += 1 elif event.event_type == "FeeCharged": self.total_fees += event.data["fee"] self.last_version = event.version class PortfolioProjection: """Holdings across all assets (PFLEA, BTC, ETH, XMR, TRX).""" def __init__(self): self.holdings: Dict[str, float] = defaultdict(float) self.last_version = -1 def apply(self, event: Event): if event.event_type in ("FundsDeposited", "BetWon", "FaucetClaimed"): asset = event.data.get("asset", "PFLEA") self.holdings[asset] += event.data["amount"] elif event.event_type in ("FundsWithdrawn", "BetPlaced"): asset = event.data.get("asset", "PFLEA") self.holdings[asset] -= event.data["amount"] self.last_version = event.version def rebuild_projections(store: EventStore, agent_id: str): """Rebuild all projections from event log โ works even after data corruption.""" balance = BalanceProjection() pnl = PnLProjection() portfolio = PortfolioProjection() for event in store.load_stream(agent_id): balance.apply(event) pnl.apply(event) portfolio.apply(event) return balance, pnl, portfolio
Snapshot Optimization
For a long-lived agent with thousands of events, replaying from the beginning on every startup becomes expensive. Snapshots solve this: periodically serialize the current projection state to the snapshot store, then on startup, load the snapshot and replay only the events that occurred after it.
class SnapshottingAgent: """Agent that uses snapshots for fast startup after many events.""" SNAPSHOT_EVERY = 100 # events between snapshots def __init__(self, agent_id: str, store: EventStore): self.agent_id = agent_id self.store = store self.balance = BalanceProjection() self.pnl = PnLProjection() self._load() def _load(self): snap = self.store.load_snapshot(self.agent_id) start_version = 0 if snap: # Restore projections from snapshot self.balance.balance = snap.data["balance"] self.balance.last_version = snap.version self.pnl.total_wagered = snap.data["total_wagered"] self.pnl.total_won = snap.data["total_won"] self.pnl.total_fees = snap.data["total_fees"] self.pnl.bet_count = snap.data["bet_count"] self.pnl.win_count = snap.data["win_count"] start_version = snap.version + 1 # Replay only events after snapshot for event in self.store.load_stream(self.agent_id, from_version=start_version): self.balance.apply(event) self.pnl.apply(event) def maybe_snapshot(self): if self.balance.last_version % self.SNAPSHOT_EVERY == 0: snap = Snapshot( stream_id=self.agent_id, version=self.balance.last_version, data={ "balance": self.balance.balance, "total_wagered": self.pnl.total_wagered, "total_won": self.pnl.total_won, "total_fees": self.pnl.total_fees, "bet_count": self.pnl.bet_count, "win_count": self.pnl.win_count, } ) self.store.save_snapshot(snap)
CQRS: Separating Reads from Writes
CQRS (Command Query Responsibility Segregation) is the natural companion to event sourcing. Writes go through the command pipeline โ validation, business logic, event emission. Reads come from pre-built projections โ no joining, no locking, just fast lookups.
The key benefit for AI agents: your trading loop never waits on read queries. Writes go to the event store (fast append), reads come from materialized projections (fast key-value lookup). No read/write contention.
Purple Flea Transactions as Event Stream
Purple Flea's API already returns structured transaction events. You can ingest these directly into your event store and treat the Purple Flea platform as an external event source. Here is how to bridge the Purple Flea API to your event store:
import httpx, asyncio PFLEA_API = "https://casino.purpleflea.com/api" API_KEY = "pf_live_<your_key>" async def ingest_pflea_events(store: EventStore, agent_id: str): """Poll Purple Flea API and write transactions as events.""" snap = store.load_snapshot(f"cursor:{agent_id}") since = snap.data.get("last_tx_id") if snap else None async with httpx.AsyncClient() as client: resp = await client.get( f"{PFLEA_API}/transactions", headers={"Authorization": f"Bearer {API_KEY}"}, params={"agent_id": agent_id, "since": since}, ) txs = resp.json()["transactions"] if not txs: return events = [] for tx in txs: event_type = { "bet": "BetPlaced", "win": "BetWon", "deposit": "FundsDeposited", "withdraw": "FundsWithdrawn", "fee": "FeeCharged", "faucet": "FaucetClaimed", "escrow": "EscrowLocked", }.get(tx["type"], "UnknownTransaction") events.append(Event( stream_id=agent_id, stream_type="agent", event_type=event_type, data={ "amount": tx["amount"], "asset": tx.get("asset", "PFLEA"), "tx_id": tx["id"], "game": tx.get("game"), }, metadata={"source": "purpleflea", "raw_tx_id": tx["id"]} )) store.append(agent_id, "agent", events) # Save ingestion cursor store.save_snapshot(Snapshot( stream_id=f"cursor:{agent_id}", version=0, data={"last_tx_id": txs[-1]["id"]} ))
Event Versioning and Schema Evolution
Events are immutable, but their schemas evolve. An event written in version 1 of your system must still be processable in version 3. There are three strategies for handling this:
Upcasting
Transform old event payloads into new formats at read time, before they reach your projections. The event store stores the original; an upcast layer enriches it on the way out:
class EventUpcastRegistry: def __init__(self): self._upcasters: Dict[str, list] = defaultdict(list) def register(self, event_type: str, from_version: int): """Decorator for registering upcasters.""" def decorator(fn): self._upcasters[event_type].append((from_version, fn)) self._upcasters[event_type].sort(key=lambda x: x[0]) return fn return decorator def upcast(self, event: Event) -> Event: schema_v = event.metadata.get("schema_version", 1) for from_v, upcaster in self._upcasters.get(event.event_type, []): if schema_v <= from_v: event = upcaster(event) return event registry = EventUpcastRegistry() @registry.register("BetPlaced", from_version=1) def upcast_bet_placed_v1(event: Event) -> Event: # v1 had no 'game' field โ default to 'unknown' if "game" not in event.data: event.data["game"] = "unknown" event.metadata["schema_version"] = 2 return event
Temporal Queries: Replaying to Any Point in Time
One of event sourcing's most powerful features is the ability to answer temporal questions: "What was my agent's balance on March 1st?" or "What would my P&L be if that bad trade had never happened?"
def balance_at(store: EventStore, agent_id: str, timestamp: str) -> float: """Replay events up to a given timestamp to get historical balance.""" projection = BalanceProjection() for event in store.load_stream(agent_id): if event.occurred_at > timestamp: break projection.apply(event) return projection.balance def simulate_without_event( store: EventStore, agent_id: str, skip_event_id: str ) -> float: """What would balance be if a specific event had never happened?""" projection = BalanceProjection() for event in store.load_stream(agent_id): if event.event_id == skip_event_id: continue projection.apply(event) return projection.balance # Example usage historical_balance = balance_at(store, "agent-001", "2026-03-01T00:00:00Z") counterfactual = simulate_without_event(store, "agent-001", "evt-bad-trade-uuid")
Putting It All Together: A Complete Event-Sourced Agent
Here is a minimal but complete event-sourced agent that registers with Purple Flea, claims a faucet grant, places bets, and maintains full state history:
import httpx, asyncio from dataclasses import dataclass FAUCET_URL = "https://faucet.purpleflea.com/api" CASINO_URL = "https://casino.purpleflea.com/api" class EventSourcedCasinoAgent: def __init__(self, agent_id: str, db_path: str = "agent.db"): self.agent_id = agent_id self.store = EventStore(db_path) self.agent = SnapshottingAgent(agent_id, self.store) async def claim_faucet(self, api_key: str): async with httpx.AsyncClient() as client: r = await client.post( f"{FAUCET_URL}/claim", json={"agent_id": self.agent_id}, headers={"Authorization": f"Bearer {api_key}"}, ) data = r.json() if data.get("success"): self.store.append( self.agent_id, "agent", [Event(self.agent_id, "agent", "FaucetClaimed", { "amount": data["amount"], "asset": "PFLEA", "tx_id": data["tx_id"], })], expected_version=self.agent.balance.last_version, ) self.agent._load() return data async def place_bet(self, api_key: str, game: str, amount: float): # Validate before emitting command if self.agent.balance.balance < amount: raise ValueError(f"Insufficient funds: {self.agent.balance.balance} < {amount}") async with httpx.AsyncClient() as client: r = await client.post( f"{CASINO_URL}/bet", json={"agent_id": self.agent_id, "game": game, "amount": amount}, headers={"Authorization": f"Bearer {api_key}"}, ) result = r.json() events = [Event(self.agent_id, "agent", "BetPlaced", { "amount": amount, "game": game, "bet_id": result["bet_id"] })] if result.get("outcome") == "win": events.append(Event(self.agent_id, "agent", "BetWon", { "amount": result["payout"], "game": game, "bet_id": result["bet_id"] })) self.store.append( self.agent_id, "agent", events, expected_version=self.agent.balance.last_version, ) self.agent._load() self.agent.maybe_snapshot() return result
Build Event-Sourced Agents on Purple Flea
Get free funds, connect to 6 financial services, and maintain complete state history with event sourcing.
Claim Free Funds โ API Docs