Architecture

Event Sourcing for AI Agent State

The Problem with CRUD for Agent State

Most developers building AI agents reach for the same pattern they use for web applications: a database table with a current-state row. The agent reads its balance, makes a trade, and updates the balance field. Simple, familiar โ€” and deeply problematic for autonomous agents operating in financial systems.

CRUD (Create, Read, Update, Delete) state management destroys historical context. When an agent's balance moves from 1,000 to 750, the intermediate history vanishes. You know the current state but not how you got there. For AI agents that need to audit their decisions, replay scenarios, or recover from failures, this is catastrophic.

Event sourcing inverts this model. Instead of storing current state, you store a log of every event that has ever happened. The current state is derived by replaying those events. This gives you something CRUD can never offer: a complete, immutable audit trail of every decision an agent has made.

Dimension CRUD State Event Sourcing
Storage model Current state only Append-only event log
History โœ— Lost on update โœ“ Full audit trail
Temporal queries โœ— Impossible โœ“ Replay to any point
Failure recovery From last backup From event log (exact)
Concurrent writes Lock/retry needed Append-only, no contention
Debugging Hard โ€” state is opaque Easy โ€” trace every event
New projections Requires schema migration Replay events into new model
Complexity Low initial Higher initial, lower long-term
Key insight Financial regulators require audit trails. Event sourcing gives you one for free. Every Purple Flea transaction is already an event โ€” you can model your agent state directly on top of the Purple Flea event stream.

Core Concepts: Commands, Events, Projections

Event sourcing has three foundational concepts that work together in a pipeline:

Commands

A command is an instruction to do something. It represents intent, not fact. Commands can be rejected. Examples: PlaceBet, WithdrawFunds, RegisterAgent. Commands are validated before being accepted, and if the system cannot fulfill them, they fail without producing any events.

Events

An event is a fact that has already happened. Events are immutable and always past tense: BetPlaced, FundsWithdrawn, AgentRegistered. Once an event is written to the event store, it never changes. This immutability is what makes the entire pattern trustworthy.

Projections

A projection (also called a read model or view model) is the current state derived by folding events. Your agent's balance is a projection: start at zero, apply each deposit event, subtract each withdrawal event, and you have the current balance. Projections can be rebuilt at any time by replaying events.

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ EVENT SOURCING PIPELINE โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ Agent/API Event Bus Projections โ”‚ โ”‚ โ”‚ โ”‚ โ”€โ”€ PlaceBet cmd โ”€โ”€> โ”‚ โ”‚ โ”‚ โ”‚ validate + emit โ”‚ โ”‚ โ”‚ BetPlaced event โ”‚ โ”‚ <โ”€โ”€ accepted โ”€โ”€ โ”‚ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€> โ”‚ โ”‚ โ”‚ (appended to store) โ”‚ โ”‚ โ”‚ โ”‚ update balance โ”‚ โ”‚ โ”‚ update P&L โ”‚ โ”‚ โ”‚ update leaderboard โ”‚ โ”‚ โ”‚ โ”‚ โ”€โ”€ GetBalance โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€> โ”‚ โ”‚ <โ”€โ”€ 750 PFLEA โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€ โ”‚

Event Store Design

An event store needs to satisfy three properties: it must be append-only (no updates or deletes), events must be ordered within an aggregate, and reads must support both sequential replay and random access by aggregate ID.

For AI agents, SQLite is an excellent starting point. It is embedded, zero-infrastructure, and supports WAL mode for concurrent reads. Here is a production-ready schema:

-- Event store schema for AI agent state
CREATE TABLE events (
  id          INTEGER PRIMARY KEY AUTOINCREMENT,
  event_id    TEXT    NOT NULL UNIQUE,          -- UUID
  stream_id   TEXT    NOT NULL,                 -- aggregate identity
  stream_type TEXT    NOT NULL,                 -- 'agent' | 'portfolio' | 'session'
  event_type  TEXT    NOT NULL,                 -- 'BetPlaced' | 'FundsDeposited' etc.
  version     INTEGER NOT NULL,                 -- monotonic per stream_id
  data        TEXT    NOT NULL,                 -- JSON payload
  metadata    TEXT    NOT NULL DEFAULT '{}',    -- causation_id, correlation_id, etc.
  occurred_at TEXT    NOT NULL,                 -- ISO8601 timestamp
  UNIQUE (stream_id, version)                   -- optimistic concurrency
);

CREATE INDEX idx_stream ON events (stream_id, version);
CREATE INDEX idx_type   ON events (event_type, occurred_at);
CREATE INDEX idx_global ON events (id);        -- global ordering for projections

-- Snapshots table (optimization for long-lived aggregates)
CREATE TABLE snapshots (
  stream_id   TEXT    NOT NULL PRIMARY KEY,
  version     INTEGER NOT NULL,
  data        TEXT    NOT NULL,
  taken_at    TEXT    NOT NULL
);

The UNIQUE (stream_id, version) constraint is your optimistic concurrency guard. If two writers try to append the same version to the same stream simultaneously, one will fail with a constraint error โ€” forcing a retry with the latest state.

Python EventStore Implementation

Here is a complete, production-quality Python EventStore class built on SQLite, with support for append, replay, snapshots, and version checking:

import sqlite3, json, uuid
from datetime import datetime, timezone
from dataclasses import dataclass, field, asdict
from typing import List, Optional, Dict, Any, Iterator
from contextlib import contextmanager


@dataclass
class Event:
    stream_id:   str
    stream_type: str
    event_type:  str
    data:        Dict[str, Any]
    event_id:    str = field(default_factory=lambda: str(uuid.uuid4()))
    version:     int = 0
    metadata:    Dict[str, Any] = field(default_factory=dict)
    occurred_at: str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )


@dataclass
class Snapshot:
    stream_id: str
    version:   int
    data:      Dict[str, Any]
    taken_at:  str = field(
        default_factory=lambda: datetime.now(timezone.utc).isoformat()
    )


class OptimisticConcurrencyError(Exception):
    pass


class EventStore:
    """
    Append-only SQLite-backed event store for AI agent state.

    Usage:
        store = EventStore("agent_state.db")
        store.append("agent-001", "agent", [
            Event("agent-001", "agent", "BetPlaced", {"amount": 100, "game": "blackjack"})
        ], expected_version=0)
    """

    def __init__(self, db_path: str):
        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        with self._conn() as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS events (
                    id          INTEGER PRIMARY KEY AUTOINCREMENT,
                    event_id    TEXT    NOT NULL UNIQUE,
                    stream_id   TEXT    NOT NULL,
                    stream_type TEXT    NOT NULL,
                    event_type  TEXT    NOT NULL,
                    version     INTEGER NOT NULL,
                    data        TEXT    NOT NULL,
                    metadata    TEXT    NOT NULL DEFAULT '{}',
                    occurred_at TEXT    NOT NULL,
                    UNIQUE (stream_id, version)
                )
            """)
            conn.execute("""
                CREATE TABLE IF NOT EXISTS snapshots (
                    stream_id TEXT    NOT NULL PRIMARY KEY,
                    version   INTEGER NOT NULL,
                    data      TEXT    NOT NULL,
                    taken_at  TEXT    NOT NULL
                )
            """)
            conn.execute("PRAGMA journal_mode=WAL")
            conn.execute("CREATE INDEX IF NOT EXISTS idx_stream ON events (stream_id, version)")

    @contextmanager
    def _conn(self):
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        try:
            yield conn
            conn.commit()
        except:
            conn.rollback()
            raise
        finally:
            conn.close()

    def append(
        self,
        stream_id: str,
        stream_type: str,
        events: List[Event],
        expected_version: int = -1,
    ) -> int:
        """Append events to a stream with optimistic concurrency check."""
        with self._conn() as conn:
            row = conn.execute(
                "SELECT MAX(version) FROM events WHERE stream_id = ?",
                (stream_id,)
            ).fetchone()
            current_version = row[0] if row[0] is not None else -1

            if expected_version != -1 and current_version != expected_version:
                raise OptimisticConcurrencyError(
                    f"Stream {stream_id}: expected v{expected_version}, got v{current_version}"
                )

            next_version = current_version + 1
            for i, event in enumerate(events):
                event.version = next_version + i
                try:
                    conn.execute(
                        """INSERT INTO events
                           (event_id, stream_id, stream_type, event_type,
                            version, data, metadata, occurred_at)
                           VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
                        (
                            event.event_id, stream_id, stream_type,
                            event.event_type, event.version,
                            json.dumps(event.data), json.dumps(event.metadata),
                            event.occurred_at,
                        )
                    )
                except sqlite3.IntegrityError as e:
                    raise OptimisticConcurrencyError(str(e)) from e

        return next_version + len(events) - 1

    def load_stream(
        self, stream_id: str, from_version: int = 0, to_version: Optional[int] = None
    ) -> Iterator[Event]:
        """Replay all events for an aggregate from a given version."""
        query = "SELECT * FROM events WHERE stream_id = ? AND version >= ?"
        params: list = [stream_id, from_version]

        if to_version is not None:
            query += " AND version <= ?"
            params.append(to_version)

        query += " ORDER BY version ASC"

        with self._conn() as conn:
            for row in conn.execute(query, params):
                yield Event(
                    stream_id=row["stream_id"],
                    stream_type=row["stream_type"],
                    event_type=row["event_type"],
                    data=json.loads(row["data"]),
                    event_id=row["event_id"],
                    version=row["version"],
                    metadata=json.loads(row["metadata"]),
                    occurred_at=row["occurred_at"],
                )

    def save_snapshot(self, snapshot: Snapshot):
        with self._conn() as conn:
            conn.execute(
                """INSERT OR REPLACE INTO snapshots (stream_id, version, data, taken_at)
                   VALUES (?, ?, ?, ?)""",
                (snapshot.stream_id, snapshot.version,
                 json.dumps(snapshot.data), snapshot.taken_at)
            )

    def load_snapshot(self, stream_id: str) -> Optional[Snapshot]:
        with self._conn() as conn:
            row = conn.execute(
                "SELECT * FROM snapshots WHERE stream_id = ?", (stream_id,)
            ).fetchone()
            if row is None:
                return None
            return Snapshot(
                stream_id=row["stream_id"],
                version=row["version"],
                data=json.loads(row["data"]),
                taken_at=row["taken_at"],
            )

    def all_events(self, from_global_id: int = 0) -> Iterator[Event]:
        """Global event log ordered by insertion โ€” used by projections."""
        with self._conn() as conn:
            for row in conn.execute(
                "SELECT * FROM events WHERE id > ? ORDER BY id ASC", (from_global_id,)
            ):
                yield Event(
                    stream_id=row["stream_id"],
                    stream_type=row["stream_type"],
                    event_type=row["event_type"],
                    data=json.loads(row["data"]),
                    event_id=row["event_id"],
                    version=row["version"],
                    metadata=json.loads(row["metadata"]),
                    occurred_at=row["occurred_at"],
                )

Projections: Portfolio, Balance, P&L

Projections are pure functions over event streams. They fold events into a view that serves read queries. Here are three projections every financial agent needs:

from collections import defaultdict


class BalanceProjection:
    """Current balance derived from deposit/withdrawal events."""

    def __init__(self):
        self.balance = 0.0
        self.last_version = -1

    def apply(self, event: Event):
        t = event.event_type
        if t == "FundsDeposited":
            self.balance += event.data["amount"]
        elif t in ("FundsWithdrawn", "BetPlaced", "EscrowLocked"):
            self.balance -= event.data["amount"]
        elif t in ("BetWon", "EscrowReleased", "FaucetClaimed"):
            self.balance += event.data["amount"]
        elif t == "FeeCharged":
            self.balance -= event.data["fee"]
        self.last_version = event.version


class PnLProjection:
    """Rolling profit and loss across all gambling sessions."""

    def __init__(self):
        self.total_wagered = 0.0
        self.total_won     = 0.0
        self.total_fees    = 0.0
        self.bet_count     = 0
        self.win_count     = 0
        self.last_version  = -1

    @property
    def net_pnl(self) -> float:
        return self.total_won - self.total_wagered - self.total_fees

    @property
    def win_rate(self) -> float:
        return self.win_count / self.bet_count if self.bet_count > 0 else 0.0

    @property
    def roi(self) -> float:
        if self.total_wagered == 0:
            return 0.0
        return self.net_pnl / self.total_wagered

    def apply(self, event: Event):
        if event.event_type == "BetPlaced":
            self.total_wagered += event.data["amount"]
            self.bet_count += 1
        elif event.event_type == "BetWon":
            self.total_won += event.data["amount"]
            self.win_count += 1
        elif event.event_type == "FeeCharged":
            self.total_fees += event.data["fee"]
        self.last_version = event.version


class PortfolioProjection:
    """Holdings across all assets (PFLEA, BTC, ETH, XMR, TRX)."""

    def __init__(self):
        self.holdings: Dict[str, float] = defaultdict(float)
        self.last_version = -1

    def apply(self, event: Event):
        if event.event_type in ("FundsDeposited", "BetWon", "FaucetClaimed"):
            asset = event.data.get("asset", "PFLEA")
            self.holdings[asset] += event.data["amount"]
        elif event.event_type in ("FundsWithdrawn", "BetPlaced"):
            asset = event.data.get("asset", "PFLEA")
            self.holdings[asset] -= event.data["amount"]
        self.last_version = event.version


def rebuild_projections(store: EventStore, agent_id: str):
    """Rebuild all projections from event log โ€” works even after data corruption."""
    balance   = BalanceProjection()
    pnl       = PnLProjection()
    portfolio = PortfolioProjection()

    for event in store.load_stream(agent_id):
        balance.apply(event)
        pnl.apply(event)
        portfolio.apply(event)

    return balance, pnl, portfolio

Snapshot Optimization

For a long-lived agent with thousands of events, replaying from the beginning on every startup becomes expensive. Snapshots solve this: periodically serialize the current projection state to the snapshot store, then on startup, load the snapshot and replay only the events that occurred after it.

class SnapshottingAgent:
    """Agent that uses snapshots for fast startup after many events."""
    SNAPSHOT_EVERY = 100  # events between snapshots

    def __init__(self, agent_id: str, store: EventStore):
        self.agent_id = agent_id
        self.store    = store
        self.balance  = BalanceProjection()
        self.pnl      = PnLProjection()
        self._load()

    def _load(self):
        snap = self.store.load_snapshot(self.agent_id)
        start_version = 0

        if snap:
            # Restore projections from snapshot
            self.balance.balance      = snap.data["balance"]
            self.balance.last_version = snap.version
            self.pnl.total_wagered    = snap.data["total_wagered"]
            self.pnl.total_won        = snap.data["total_won"]
            self.pnl.total_fees       = snap.data["total_fees"]
            self.pnl.bet_count        = snap.data["bet_count"]
            self.pnl.win_count        = snap.data["win_count"]
            start_version = snap.version + 1

        # Replay only events after snapshot
        for event in self.store.load_stream(self.agent_id, from_version=start_version):
            self.balance.apply(event)
            self.pnl.apply(event)

    def maybe_snapshot(self):
        if self.balance.last_version % self.SNAPSHOT_EVERY == 0:
            snap = Snapshot(
                stream_id=self.agent_id,
                version=self.balance.last_version,
                data={
                    "balance":       self.balance.balance,
                    "total_wagered": self.pnl.total_wagered,
                    "total_won":     self.pnl.total_won,
                    "total_fees":    self.pnl.total_fees,
                    "bet_count":     self.pnl.bet_count,
                    "win_count":     self.pnl.win_count,
                }
            )
            self.store.save_snapshot(snap)

CQRS: Separating Reads from Writes

CQRS (Command Query Responsibility Segregation) is the natural companion to event sourcing. Writes go through the command pipeline โ€” validation, business logic, event emission. Reads come from pre-built projections โ€” no joining, no locking, just fast lookups.

CQRS ARCHITECTURE โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” Commands โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ โ”‚ โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€> โ”‚ Command Handler โ”‚ โ”‚ โ”‚ โ”‚ (validates cmd) โ”‚ โ”‚ Agent โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ”‚ โ”‚ emits โ”‚ โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ โ”‚ โ”‚ Event Store โ”‚ โ”‚ โ”‚ โ”‚ (append-only) โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ”‚ publishes โ”‚ Queries โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ โ”‚ Projection Engine โ”‚ โ”‚ โ”‚ BalanceProjection โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€> โ”‚ PnLProjection โ”‚ fast reads โ”‚ PortfolioProjection โ”‚ no contention โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

The key benefit for AI agents: your trading loop never waits on read queries. Writes go to the event store (fast append), reads come from materialized projections (fast key-value lookup). No read/write contention.

Purple Flea Transactions as Event Stream

Purple Flea's API already returns structured transaction events. You can ingest these directly into your event store and treat the Purple Flea platform as an external event source. Here is how to bridge the Purple Flea API to your event store:

import httpx, asyncio

PFLEA_API = "https://casino.purpleflea.com/api"
API_KEY   = "pf_live_<your_key>"


async def ingest_pflea_events(store: EventStore, agent_id: str):
    """Poll Purple Flea API and write transactions as events."""
    snap = store.load_snapshot(f"cursor:{agent_id}")
    since = snap.data.get("last_tx_id") if snap else None

    async with httpx.AsyncClient() as client:
        resp = await client.get(
            f"{PFLEA_API}/transactions",
            headers={"Authorization": f"Bearer {API_KEY}"},
            params={"agent_id": agent_id, "since": since},
        )
        txs = resp.json()["transactions"]

    if not txs:
        return

    events = []
    for tx in txs:
        event_type = {
            "bet":      "BetPlaced",
            "win":      "BetWon",
            "deposit":  "FundsDeposited",
            "withdraw": "FundsWithdrawn",
            "fee":      "FeeCharged",
            "faucet":   "FaucetClaimed",
            "escrow":   "EscrowLocked",
        }.get(tx["type"], "UnknownTransaction")

        events.append(Event(
            stream_id=agent_id,
            stream_type="agent",
            event_type=event_type,
            data={
                "amount": tx["amount"],
                "asset":  tx.get("asset", "PFLEA"),
                "tx_id":  tx["id"],
                "game":   tx.get("game"),
            },
            metadata={"source": "purpleflea", "raw_tx_id": tx["id"]}
        ))

    store.append(agent_id, "agent", events)

    # Save ingestion cursor
    store.save_snapshot(Snapshot(
        stream_id=f"cursor:{agent_id}",
        version=0,
        data={"last_tx_id": txs[-1]["id"]}
    ))

Event Versioning and Schema Evolution

Events are immutable, but their schemas evolve. An event written in version 1 of your system must still be processable in version 3. There are three strategies for handling this:

Upcasting

Transform old event payloads into new formats at read time, before they reach your projections. The event store stores the original; an upcast layer enriches it on the way out:

class EventUpcastRegistry:
    def __init__(self):
        self._upcasters: Dict[str, list] = defaultdict(list)

    def register(self, event_type: str, from_version: int):
        """Decorator for registering upcasters."""
        def decorator(fn):
            self._upcasters[event_type].append((from_version, fn))
            self._upcasters[event_type].sort(key=lambda x: x[0])
            return fn
        return decorator

    def upcast(self, event: Event) -> Event:
        schema_v = event.metadata.get("schema_version", 1)
        for from_v, upcaster in self._upcasters.get(event.event_type, []):
            if schema_v <= from_v:
                event = upcaster(event)
        return event


registry = EventUpcastRegistry()

@registry.register("BetPlaced", from_version=1)
def upcast_bet_placed_v1(event: Event) -> Event:
    # v1 had no 'game' field โ€” default to 'unknown'
    if "game" not in event.data:
        event.data["game"] = "unknown"
        event.metadata["schema_version"] = 2
    return event

Temporal Queries: Replaying to Any Point in Time

One of event sourcing's most powerful features is the ability to answer temporal questions: "What was my agent's balance on March 1st?" or "What would my P&L be if that bad trade had never happened?"

def balance_at(store: EventStore, agent_id: str, timestamp: str) -> float:
    """Replay events up to a given timestamp to get historical balance."""
    projection = BalanceProjection()

    for event in store.load_stream(agent_id):
        if event.occurred_at > timestamp:
            break
        projection.apply(event)

    return projection.balance


def simulate_without_event(
    store: EventStore, agent_id: str, skip_event_id: str
) -> float:
    """What would balance be if a specific event had never happened?"""
    projection = BalanceProjection()

    for event in store.load_stream(agent_id):
        if event.event_id == skip_event_id:
            continue
        projection.apply(event)

    return projection.balance


# Example usage
historical_balance = balance_at(store, "agent-001", "2026-03-01T00:00:00Z")
counterfactual     = simulate_without_event(store, "agent-001", "evt-bad-trade-uuid")
Try it with Purple Flea Register your agent at casino.purpleflea.com, claim free funds at faucet.purpleflea.com, and start building an event-sourced state model against real Purple Flea transactions from day one.

Putting It All Together: A Complete Event-Sourced Agent

Here is a minimal but complete event-sourced agent that registers with Purple Flea, claims a faucet grant, places bets, and maintains full state history:

import httpx, asyncio
from dataclasses import dataclass

FAUCET_URL = "https://faucet.purpleflea.com/api"
CASINO_URL = "https://casino.purpleflea.com/api"


class EventSourcedCasinoAgent:
    def __init__(self, agent_id: str, db_path: str = "agent.db"):
        self.agent_id = agent_id
        self.store    = EventStore(db_path)
        self.agent    = SnapshottingAgent(agent_id, self.store)

    async def claim_faucet(self, api_key: str):
        async with httpx.AsyncClient() as client:
            r = await client.post(
                f"{FAUCET_URL}/claim",
                json={"agent_id": self.agent_id},
                headers={"Authorization": f"Bearer {api_key}"},
            )
        data = r.json()
        if data.get("success"):
            self.store.append(
                self.agent_id, "agent",
                [Event(self.agent_id, "agent", "FaucetClaimed", {
                    "amount": data["amount"],
                    "asset":  "PFLEA",
                    "tx_id":  data["tx_id"],
                })],
                expected_version=self.agent.balance.last_version,
            )
            self.agent._load()
        return data

    async def place_bet(self, api_key: str, game: str, amount: float):
        # Validate before emitting command
        if self.agent.balance.balance < amount:
            raise ValueError(f"Insufficient funds: {self.agent.balance.balance} < {amount}")

        async with httpx.AsyncClient() as client:
            r = await client.post(
                f"{CASINO_URL}/bet",
                json={"agent_id": self.agent_id, "game": game, "amount": amount},
                headers={"Authorization": f"Bearer {api_key}"},
            )
        result = r.json()

        events = [Event(self.agent_id, "agent", "BetPlaced", {
            "amount": amount, "game": game, "bet_id": result["bet_id"]
        })]

        if result.get("outcome") == "win":
            events.append(Event(self.agent_id, "agent", "BetWon", {
                "amount": result["payout"], "game": game, "bet_id": result["bet_id"]
            }))

        self.store.append(
            self.agent_id, "agent", events,
            expected_version=self.agent.balance.last_version,
        )
        self.agent._load()
        self.agent.maybe_snapshot()
        return result
Purple Flea + Event Sourcing Purple Flea's 6-service suite (Casino, Trading, Wallet, Domains, Faucet, Escrow) generates rich, structured events. Each service interaction โ€” bet, trade, domain registration, escrow lock โ€” becomes an event in your agent's financial history. Start with the Faucet to get funded at no cost.

Build Event-Sourced Agents on Purple Flea

Get free funds, connect to 6 financial services, and maintain complete state history with event sourcing.

Claim Free Funds โ†’ API Docs