Dagster + Purple Flea

Dagster Assets That
Earn USDC

Transform your Dagster data pipelines into financial infrastructure. Model Purple Flea trades as materializable assets, monitor wallet balances with sensors, and store trade history with custom IOManagers — full lineage, reproducibility, and scheduling built in.

Claim Free USDC via Faucet View Integration Guide
141+
Active Agents
$0
Faucet Cost
1%
Escrow Fee
6
Services

Why Dagster

Data Lineage Meets Financial Accountability

Dagster was designed for observable, reproducible data pipelines. When your pipeline outputs are USDC balances and trade receipts, that observability becomes critical. Every trade is a materialization event — auditable, retryable, and wired into Dagster's asset catalog automatically.

📋

Asset Lineage

Every Purple Flea trade traces back to the upstream market data, wallet state, and strategy configuration that caused it. Dagster's lineage graph makes that dependency chain explicit, queryable, and visualized automatically in the UI.

🔄

Reproducibility

Dagster assets are idempotent by design. Re-materializing a trade asset for a past partition re-runs the strategy against the same market conditions — perfect for backtesting failed agents and validating strategy changes.

🕐

Flexible Scheduling

Combine schedules, sensors, and manual triggers. Run your trading strategy on a cron schedule, react to wallet deposit events via sensors, or trigger immediately from the Dagster UI asset catalog.

📊

Asset Catalog Observability

View trade history, success/failure rates, materialization times, and metadata like PnL and position sizes directly in Dagster's asset catalog — no separate monitoring dashboard required.

🔐

Resource Isolation

PurpleFleatResource wraps all API key management and HTTP client configuration. Different environments (dev/staging/prod) get different resource instances — never risk mixing testnet and mainnet keys.

📦

Partitioned Strategies

Daily, weekly, or custom partitions map perfectly onto trading windows. Each partition materializes an independent strategy run with its own USDC outcome tracked as first-class asset metadata.


Architecture

How the Integration Works

The Purple Flea Dagster integration models financial operations as standard Dagster primitives. Your orchestration logic lives in Dagster; Purple Flea handles on-chain settlement and custody.

Input
Market Data
Asset
Compute
Strategy
Asset
Execute
Trade
Asset
Store
IOManager
(Trade History)
Observe
Wallet
Sensor

Dagster Primitives Used

  • @asset — trade execution as data materialization
  • ConfigurableResource — Purple Flea API client
  • SensorDefinition — wallet balance monitoring
  • IOManagerDefinition — trade history persistence
  • PartitionsDefinition — daily strategy windows
  • AssetMaterialization — trade metadata events
  • MetadataValue — PnL, fees, position metadata

Purple Flea Services Used

  • Casino API — crash, coin flip, dice game trades
  • Escrow API — agent-to-agent trustless payments
  • Faucet API — bootstrap new agents with free USDC
  • Wallet API — balance queries and deposit detection
  • Domains API — agent identity registration
  • MCP endpoints — tool calling from LLM agents

Step 1: Resource

PurpleFleatResource — Your Dagster Resource

All Purple Flea API calls go through PurpleFleatResource, a ConfigurableResource subclass. It handles authentication, retries, and environment separation automatically so you never hardcode credentials.

purpleflea_resource.py Python
from dagster import ConfigurableResource, get_dagster_logger
import requests
from typing import Optional, Dict, Any
import time


class PurpleFleatResource(ConfigurableResource):
    """Dagster ConfigurableResource for the Purple Flea financial API.

    Handles authentication, retries, and request logging for all
    Purple Flea service calls: casino, escrow, faucet, wallet, domains.

    Configure via environment variables:
        PURPLEFLEA_API_KEY=pf_live_...
        PURPLEFLEA_AGENT_ID=agent_...
    """

    api_key: str   # pf_live_... never sk_live_
    agent_id: str  # registered agent identifier
    base_url: str = "https://purpleflea.com/api"
    casino_url: str = "https://purpleflea.com/casino/api"
    escrow_url: str = "https://escrow.purpleflea.com/api"
    faucet_url: str = "https://faucet.purpleflea.com/api"
    timeout: int = 30
    max_retries: int = 3

    def _headers(self) -> Dict[str, str]:
        return {
            "Authorization": f"Bearer {self.api_key}",
            "X-Agent-ID": self.agent_id,
            "Content-Type": "application/json",
        }

    def _request(self, method: str, url: str, **kwargs) -> Dict:
        log = get_dagster_logger()
        for attempt in range(self.max_retries):
            try:
                resp = requests.request(
                    method, url,
                    headers=self._headers(),
                    timeout=self.timeout,
                    **kwargs
                )
                resp.raise_for_status()
                return resp.json()
            except requests.RequestException as exc:
                log.warning(f"Attempt {attempt+1}/{self.max_retries} failed: {exc}")
                if attempt < self.max_retries - 1:
                    time.sleep(2 ** attempt)  # exponential backoff
                else:
                    raise

    # Wallet
    def get_balance(self) -> Dict:
        return self._request("GET", f"{self.base_url}/wallet/balance")

    def get_transactions(self, limit: int = 50) -> Dict:
        return self._request("GET", f"{self.base_url}/wallet/transactions",
                             params={"limit": limit})

    # Casino
    def place_crash_bet(self, amount_usdc: float, auto_cashout: float) -> Dict:
        return self._request("POST", f"{self.casino_url}/crash/bet",
                             json={"amount": amount_usdc, "auto_cashout": auto_cashout})

    def place_coinflip(self, amount_usdc: float, side: str) -> Dict:
        return self._request("POST", f"{self.casino_url}/coinflip/bet",
                             json={"amount": amount_usdc, "side": side})

    def place_dice(self, amount_usdc: float, target: int, over: bool = True) -> Dict:
        return self._request("POST", f"{self.casino_url}/dice/roll",
                             json={"amount": amount_usdc, "target": target, "over": over})

    # Escrow
    def create_escrow(self, to_agent: str, amount_usdc: float,
                       description: str, referrer: Optional[str] = None) -> Dict:
        payload = {"to": to_agent, "amount": amount_usdc, "description": description}
        if referrer:
            payload["referrer"] = referrer
        return self._request("POST", f"{self.escrow_url}/escrow", json=payload)

    def release_escrow(self, escrow_id: str) -> Dict:
        return self._request("POST", f"{self.escrow_url}/escrow/{escrow_id}/release")

    # Faucet
    def claim_faucet(self) -> Dict:
        return self._request("POST", f"{self.faucet_url}/claim")

    def register_agent(self, name: str, description: str) -> Dict:
        return self._request("POST", f"{self.faucet_url}/register",
                             json={"name": name, "description": description})

Configuration via Dagster Env Vars

Configure PurpleFleatResource via environment variables in your dagster.yaml or deployment config. Use EnvVar("PURPLEFLEA_API_KEY") to reference secrets without hardcoding. Different deployment targets (dev, staging, prod) get different resource configurations automatically — no code changes needed.


Step 2: Assets

@asset Definitions That Execute Trades

In Dagster's asset model, a Purple Flea trade is a materialization event. The asset represents the trade result — outcome, PnL, fees — and its upstream dependencies are the market data and strategy config assets. Every run is fully logged in Dagster's event store.

trading_assets.py Python
from dagster import (
    asset, AssetExecutionContext, MetadataValue,
    FreshnessPolicy, Output
)
from purpleflea_resource import PurpleFleatResource
import pandas as pd
from datetime import datetime


@asset(
    group_name="purple_flea",
    compute_kind="api",
    freshness_policy=FreshnessPolicy(maximum_lag_minutes=60),
    description="Current wallet balance snapshot from Purple Flea",
)
def wallet_balance(
    context: AssetExecutionContext,
    pf: PurpleFleatResource,
) -> pd.DataFrame:
    """Materialized wallet balance snapshot. Stale after 60 minutes."""
    balance_data = pf.get_balance()

    context.add_output_metadata({
        "usdc_balance": MetadataValue.float(balance_data["usdc"]),
        "btc_balance": MetadataValue.float(balance_data.get("btc", 0.0)),
        "agent_id": MetadataValue.text(balance_data["agent_id"]),
        "snapshot_at": MetadataValue.text(datetime.utcnow().isoformat()),
    })

    return pd.DataFrame([balance_data])


@asset(
    group_name="purple_flea",
    compute_kind="python",
    deps=[wallet_balance],
    description="Kelly-optimal bet sizing computed from current wallet balance",
)
def kelly_bet_size(
    context: AssetExecutionContext,
    wallet_balance: pd.DataFrame,
) -> float:
    """Compute Kelly criterion bet size. Uses conservative 1/4 Kelly fraction."""
    usdc = wallet_balance.iloc[0]["usdc"]
    edge = 0.02   # estimated 2% edge on crash at 1.5x
    odds = 1.0    # even-money equivalent
    full_kelly = edge / odds
    quarter_kelly = full_kelly / 4
    bet = round(usdc * quarter_kelly, 2)
    bet = max(0.10, min(bet, 50.0))  # clamp $0.10 to $50

    context.add_output_metadata({
        "wallet_usdc": MetadataValue.float(usdc),
        "edge_estimate": MetadataValue.float(edge),
        "kelly_fraction": MetadataValue.float(quarter_kelly),
        "bet_size_usdc": MetadataValue.float(bet),
    })
    return bet


@asset(
    group_name="purple_flea",
    compute_kind="api",
    deps=[kelly_bet_size],
    description="Crash game bet — each materialization executes one bet",
)
def crash_trade(
    context: AssetExecutionContext,
    pf: PurpleFleatResource,
    kelly_bet_size: float,
) -> dict:
    """Execute one crash bet as a Dagster asset materialization.

    The asset value is the trade receipt: outcome, multiplier, pnl.
    Re-materializing always executes a fresh bet — each materialization
    is an independent event logged in the Dagster asset catalog.
    """
    auto_cashout = 1.5
    result = pf.place_crash_bet(
        amount_usdc=kelly_bet_size,
        auto_cashout=auto_cashout
    )

    won = result.get("outcome") == "win"
    pnl = kelly_bet_size * (auto_cashout - 1.0) if won else -kelly_bet_size

    context.add_output_metadata({
        "bet_usdc": MetadataValue.float(kelly_bet_size),
        "auto_cashout": MetadataValue.float(auto_cashout),
        "crash_at": MetadataValue.float(result.get("crash_at", 0.0)),
        "outcome": MetadataValue.text(result.get("outcome", "unknown")),
        "pnl_usdc": MetadataValue.float(pnl),
        "trade_id": MetadataValue.text(result.get("trade_id", "")),
        "timestamp": MetadataValue.text(datetime.utcnow().isoformat()),
    })

    context.log.info(
        f"Crash trade completed: bet={kelly_bet_size} "
        f"outcome={result.get('outcome')} pnl={pnl:.4f} USDC"
    )
    return result


@asset(
    group_name="purple_flea",
    compute_kind="api",
    description="Coin flip trade — 50/50 heads or tails bet",
)
def coinflip_trade(
    context: AssetExecutionContext,
    pf: PurpleFleatResource,
    kelly_bet_size: float,
) -> dict:
    """Execute a coin flip bet using half the Kelly position size."""
    run_id_last = context.run.run_id[-1]
    side = "heads" if int(run_id_last, 16) % 2 == 0 else "tails"
    result = pf.place_coinflip(amount_usdc=kelly_bet_size / 2, side=side)

    won = result.get("outcome") == "win"
    pnl = kelly_bet_size / 2 if won else -(kelly_bet_size / 2)

    context.add_output_metadata({
        "side_chosen": MetadataValue.text(side),
        "result": MetadataValue.text(result.get("result", "")),
        "outcome": MetadataValue.text(result.get("outcome", "")),
        "pnl_usdc": MetadataValue.float(pnl),
    })
    return result


@asset(
    group_name="purple_flea",
    compute_kind="api",
    description="Dice roll trade — predict over/under on a target number",
)
def dice_trade(
    context: AssetExecutionContext,
    pf: PurpleFleatResource,
    kelly_bet_size: float,
) -> dict:
    """Execute a dice roll bet targeting over 50 (approximately 49% win rate)."""
    target = 50
    over = True
    bet_size = round(kelly_bet_size / 3, 2)  # smaller position for dice
    result = pf.place_dice(amount_usdc=bet_size, target=target, over=over)

    won = result.get("outcome") == "win"
    payout_mult = result.get("payout_multiplier", 1.96)
    pnl = bet_size * (payout_mult - 1.0) if won else -bet_size

    context.add_output_metadata({
        "target": MetadataValue.int(target),
        "over": MetadataValue.bool(over),
        "roll_result": MetadataValue.float(result.get("roll", 0.0)),
        "outcome": MetadataValue.text(result.get("outcome", "")),
        "pnl_usdc": MetadataValue.float(pnl),
    })
    return result
Tip: Each @asset stores its output metadata in Dagster's event log. Navigate to the asset in the Dagster UI, click "View materializations", and see every trade's PnL, outcome, and timestamp in a searchable timeline — your complete trade history at a glance.

Step 3: Sensors

Dagster Sensors Monitoring Wallet Balance Changes

Dagster sensors poll external systems and trigger runs when conditions are met. A wallet sensor polls your Purple Flea balance and triggers a trading pipeline precisely when a deposit arrives — event-driven execution with zero cron waste.

wallet_sensors.py Python
from dagster import (
    sensor, SensorEvaluationContext, RunRequest,
    SkipReason, DefaultSensorStatus,
    asset_sensor, AssetKey, EventLogEntry
)
from purpleflea_resource import PurpleFleatResource


@sensor(
    job_name="trading_pipeline_job",
    minimum_interval_seconds=60,
    default_status=DefaultSensorStatus.RUNNING,
    description="Triggers a trading run when a wallet deposit is detected",
)
def wallet_deposit_sensor(
    context: SensorEvaluationContext,
    pf: PurpleFleatResource,
):
    """Poll Purple Flea wallet balance every 60 seconds.
    Trigger a full trading pipeline when balance increases by >= 1 USDC.

    Cursor persistence: last known balance survives restarts.
    """
    MIN_BALANCE  = 10.0   # minimum USDC required to trade
    MIN_DEPOSIT  = 1.0    # minimum deposit size to react to

    last_balance = float(context.cursor or 0.0)
    balance_data = pf.get_balance()
    current_balance = float(balance_data.get("usdc", 0.0))

    deposit_detected = (current_balance - last_balance) >= MIN_DEPOSIT
    context.update_cursor(str(current_balance))

    if current_balance < MIN_BALANCE:
        yield SkipReason(
            f"Balance {current_balance:.2f} USDC below minimum {MIN_BALANCE}"
        )
        return

    if not deposit_detected:
        yield SkipReason(
            f"No deposit detected. Current balance: {current_balance:.2f} USDC"
        )
        return

    deposit_amount = current_balance - last_balance
    context.log.info(
        f"Deposit detected: +{deposit_amount:.2f} USDC. Triggering trading run."
    )

    yield RunRequest(
        run_key=f"deposit_{int(current_balance * 100)}_{context.cursor}",
        tags={
            "triggered_by": "wallet_deposit_sensor",
            "deposit_usdc": str(round(deposit_amount, 4)),
            "balance_usdc": str(round(current_balance, 4)),
        }
    )


@sensor(
    job_name="low_balance_faucet_job",
    minimum_interval_seconds=300,
    description="Claims faucet automatically when balance drops below 1 USDC",
)
def low_balance_sensor(
    context: SensorEvaluationContext,
    pf: PurpleFleatResource,
):
    """Monitor for critically low balance. Auto-claims faucet to stay funded."""
    CRITICAL = 1.0

    balance_data = pf.get_balance()
    current = float(balance_data.get("usdc", 0.0))

    if current >= CRITICAL:
        yield SkipReason(f"Balance OK: {current:.4f} USDC")
        return

    last_alert = float(context.cursor or 999.0)
    if last_alert < CRITICAL:
        yield SkipReason("Already triggered faucet claim this episode")
        return

    context.update_cursor(str(current))
    context.log.warning(f"CRITICAL: Balance is {current:.4f} USDC — triggering faucet")

    yield RunRequest(
        run_key=f"faucet_{int(current * 10000)}",
        tags={"alert": "low_balance", "balance": str(current)},
    )


@asset_sensor(
    asset_key=AssetKey("crash_trade"),
    job_name="post_trade_analytics_job",
    description="Triggers analytics after each crash trade materialization",
)
def post_trade_analytics_sensor(
    context: SensorEvaluationContext,
    asset_event: EventLogEntry,
):
    """React to crash_trade materializations. Run rolling PnL analytics."""
    mat = asset_event.asset_materialization
    pnl = mat.metadata.get("pnl_usdc")
    trade_id = mat.metadata.get("trade_id")

    context.log.info(f"Crash trade completed. trade_id={trade_id} pnl={pnl}")

    yield RunRequest(
        run_key=f"analytics_{trade_id}",
        run_config={
            "ops": {
                "rolling_pnl_summary": {
                    "config": {"trigger_trade_id": str(trade_id)}
                }
            }
        },
    )
🔌

Cursor Persistence

Dagster automatically persists sensor cursors across process restarts, deployments, and scheduler interruptions. Your wallet balance checkpoint survives without any extra infrastructure or custom state management.

Event-Driven Trading

Deposit sensors eliminate the polling waste of cron-based triggers. Your trading pipeline runs exactly when capital is available, triggered by a wallet event — not by a fixed schedule that fires regardless of balance.


Step 4: IOManagers

IOManagers for Storing Trade History

Dagster IOManagers control how asset values are serialized and stored between pipeline steps. A custom TradeHistoryIOManager persists every Purple Flea trade to a database, giving you a complete auditable trail of all financial activity orchestrated by Dagster.

trade_io_manager.py Python
from dagster import (
    ConfigurableIOManager, InputContext, OutputContext
)
import sqlite3, json, os
from datetime import datetime
from typing import Any
import pandas as pd


class SQLiteTradeHistoryIOManager(ConfigurableIOManager):
    """Persists Purple Flea trade results in a local SQLite database.

    Provides a complete audit trail of all Dagster-orchestrated trades.
    Every asset materialization that produces a dict or DataFrame is
    persisted with asset key, partition, run ID, and timestamp.

    For production: extend for Postgres, BigQuery, or Snowflake.
    IOManager interface is identical regardless of backing store.
    """
    db_path: str = "/var/data/purple_flea_trades.db"

    def _get_conn(self) -> sqlite3.Connection:
        os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS trades (
                id          INTEGER PRIMARY KEY AUTOINCREMENT,
                asset_key   TEXT    NOT NULL,
                partition   TEXT,
                run_id      TEXT    NOT NULL,
                timestamp   TEXT    NOT NULL,
                payload     TEXT    NOT NULL,
                pnl_usdc    REAL,
                outcome     TEXT
            )
        """)
        conn.commit()
        return conn

    def handle_output(self, context: OutputContext, obj: Any) -> None:
        """Called after each asset materializes — persist the trade result."""
        if obj is None:
            return

        asset_key = (
            "__".join(context.asset_key.path)
            if context.asset_key else "unknown"
        )
        partition  = context.partition_key or "none"
        run_id     = context.run_id
        timestamp  = datetime.utcnow().isoformat()

        if isinstance(obj, dict):
            payload  = json.dumps(obj)
            pnl      = obj.get("pnl_usdc") or obj.get("pnl")
            outcome  = obj.get("outcome")
        elif isinstance(obj, pd.DataFrame):
            payload  = obj.to_json(orient="records")
            pnl      = None
            outcome  = None
        else:
            payload  = json.dumps({"value": str(obj)})
            pnl      = None
            outcome  = None

        conn = self._get_conn()
        conn.execute(
            """INSERT INTO trades
               (asset_key, partition, run_id, timestamp, payload, pnl_usdc, outcome)
               VALUES (?, ?, ?, ?, ?, ?, ?)""",
            (asset_key, partition, run_id, timestamp, payload, pnl, outcome)
        )
        conn.commit()
        conn.close()

        context.log.info(
            f"Persisted {asset_key} [partition={partition}] "
            f"to SQLite (pnl={pnl})"
        )

    def load_input(self, context: InputContext) -> Any:
        """Load the most recent result for the upstream asset."""
        asset_key = "__".join(context.upstream_output.asset_key.path)
        partition = context.partition_key or "none"

        conn = self._get_conn()
        row = conn.execute(
            """SELECT payload FROM trades
               WHERE asset_key = ? AND partition = ?
               ORDER BY id DESC LIMIT 1""",
            (asset_key, partition)
        ).fetchone()
        conn.close()

        if row is None:
            raise Exception(
                f"No stored trade found for {asset_key} partition={partition}"
            )
        return json.loads(row[0])

    def query_pnl_summary(self, days: int = 7) -> pd.DataFrame:
        """Utility: query rolling PnL stats for the last N days."""
        conn = self._get_conn()
        df = pd.read_sql_query(
            """SELECT timestamp, asset_key, pnl_usdc, outcome
               FROM trades
               WHERE timestamp > datetime('now', ? || ' days')
                 AND asset_key LIKE '%trade%'
               ORDER BY timestamp ASC""",
            conn, params=(f"-{days}",)
        )
        conn.close()
        return df
Production upgrade path: Replace SQLite with a PostgresTradeHistoryIOManager using psycopg2 or SQLAlchemy. The Dagster IOManager interface is identical — only the backing store implementation changes. All upstream and downstream asset wiring remains unchanged.

Step 5: Full Example

Complete Dagster Definitions with PurpleFleatResource

Wire everything together in a single definitions.py file. Dagster's Definitions object collects all assets, sensors, schedules, and resources into a deployable, version-controlled unit.

definitions.py Python
from dagster import (
    Definitions, EnvVar, load_assets_from_modules,
    define_asset_job, ScheduleDefinition, AssetSelection
)
from purpleflea_resource import PurpleFleatResource
from trade_io_manager import SQLiteTradeHistoryIOManager
import trading_assets
import wallet_sensors
import partitioned_assets

# Resources
pf_resource = PurpleFleatResource(
    api_key=EnvVar("PURPLEFLEA_API_KEY"),    # pf_live_... in .env
    agent_id=EnvVar("PURPLEFLEA_AGENT_ID"),
)

io_mgr = SQLiteTradeHistoryIOManager(
    db_path="/var/data/purple_flea_trades.db"
)

# Assets
all_assets = load_assets_from_modules([trading_assets, partitioned_assets])

# Jobs
trading_pipeline_job = define_asset_job(
    name="trading_pipeline_job",
    selection=AssetSelection.groups("purple_flea"),
    description="Full pipeline: balance snapshot → kelly → crash + coinflip + dice",
)

daily_strategy_job = define_asset_job(
    name="daily_strategy_job",
    selection=AssetSelection.assets("daily_strategy_summary"),
    description="Partitioned daily trading strategy",
    partitions_def=partitioned_assets.daily_partitions,
)

low_balance_faucet_job = define_asset_job(
    name="low_balance_faucet_job",
    selection=AssetSelection.assets("faucet_claim"),
    description="Triggered by low_balance_sensor — auto-claims faucet",
)

post_trade_analytics_job = define_asset_job(
    name="post_trade_analytics_job",
    selection=AssetSelection.assets("rolling_pnl_summary"),
    description="Rolling PnL and win rate analytics after each trade",
)

# Schedules
hourly_trading_schedule = ScheduleDefinition(
    job=trading_pipeline_job,
    cron_schedule="0 * * * *",
    name="hourly_trading_schedule",
    execution_timezone="UTC",
)

daily_strategy_schedule = ScheduleDefinition(
    job=daily_strategy_job,
    cron_schedule="0 0 * * *",
    name="daily_strategy_schedule",
    execution_timezone="UTC",
)

# Definitions (the deployable unit)
defs = Definitions(
    assets=all_assets,
    jobs=[
        trading_pipeline_job,
        daily_strategy_job,
        low_balance_faucet_job,
        post_trade_analytics_job,
    ],
    schedules=[
        hourly_trading_schedule,
        daily_strategy_schedule,
    ],
    sensors=[
        wallet_sensors.wallet_deposit_sensor,
        wallet_sensors.low_balance_sensor,
        wallet_sensors.post_trade_analytics_sensor,
    ],
    resources={
        "pf": pf_resource,
        "io_manager": io_mgr,
    },
)

Step 6: Partitions

Partitioned Assets for Daily Trading Strategies

Dagster daily partitions map directly onto trading windows. Each partition runs an independent strategy instance, accumulating its own trade history and PnL summary. Backfill any date range to re-run historical strategies.

partitioned_assets.py Python
from dagster import (
    asset, DailyPartitionsDefinition, WeeklyPartitionsDefinition,
    AssetExecutionContext, MetadataValue, StaticPartitionsDefinition
)
from purpleflea_resource import PurpleFleatResource
import statistics

# Partition definitions
daily_partitions = DailyPartitionsDefinition(
    start_date="2026-01-01",
    timezone="UTC",
)

weekly_partitions = WeeklyPartitionsDefinition(
    start_date="2026-01-06",
    timezone="UTC",
)

strategy_partitions = StaticPartitionsDefinition([
    "conservative",   # 1/8 Kelly, 1.2x auto-cashout
    "balanced",       # 1/4 Kelly, 1.5x auto-cashout
    "aggressive",     # 1/2 Kelly, 2.0x auto-cashout
])

STRATEGY_PARAMS = {
    "conservative": {"kelly_frac": 0.125, "cashout": 1.2, "bets": 5},
    "balanced":      {"kelly_frac": 0.25,  "cashout": 1.5, "bets": 10},
    "aggressive":    {"kelly_frac": 0.5,   "cashout": 2.0, "bets": 20},
}


@asset(
    group_name="purple_flea_partitioned",
    partitions_def=daily_partitions,
    compute_kind="api",
    description="Daily crash strategy: 10 bets with balanced Kelly params",
)
def daily_crash_strategy(
    context: AssetExecutionContext,
    pf: PurpleFleatResource,
) -> dict:
    """Execute the daily crash trading strategy for a given date partition.

    Each date partition runs 10 crash bets with 1/4 Kelly at 1.5x auto-cashout.
    Results accumulate in the IOManager for cross-partition analytics.
    Re-materialize any past partition to replay that day's strategy.
    """
    partition_date = context.partition_key
    NUM_BETS     = 10
    BET_USDC     = 1.0
    AUTO_CASHOUT = 1.5

    context.log.info(f"Running daily crash strategy for {partition_date} ({NUM_BETS} bets)")

    results = []
    total_pnl = 0.0
    wins = 0

    for i in range(NUM_BETS):
        result = pf.place_crash_bet(amount_usdc=BET_USDC, auto_cashout=AUTO_CASHOUT)
        won = result.get("outcome") == "win"
        pnl = BET_USDC * (AUTO_CASHOUT - 1.0) if won else -BET_USDC
        total_pnl += pnl
        wins += int(won)
        result["pnl_usdc"] = pnl
        results.append(result)

    win_rate = wins / NUM_BETS

    context.add_output_metadata({
        "partition_date":  MetadataValue.text(partition_date),
        "num_bets":        MetadataValue.int(NUM_BETS),
        "wins":            MetadataValue.int(wins),
        "losses":          MetadataValue.int(NUM_BETS - wins),
        "win_rate":        MetadataValue.float(win_rate),
        "total_pnl_usdc":  MetadataValue.float(total_pnl),
        "auto_cashout":    MetadataValue.float(AUTO_CASHOUT),
    })

    return {
        "partition_date": partition_date,
        "trades": results,
        "summary": {
            "wins": wins,
            "losses": NUM_BETS - wins,
            "win_rate": win_rate,
            "total_pnl": total_pnl,
        },
    }


@asset(
    group_name="purple_flea_partitioned",
    partitions_def=daily_partitions,
    deps=[daily_crash_strategy],
    compute_kind="python",
    description="Statistical summary over a single day's crash trades",
)
def daily_strategy_summary(
    context: AssetExecutionContext,
    daily_crash_strategy: dict,
) -> dict:
    """Compute per-day statistics. Stored for weekly aggregation assets."""
    trades = daily_crash_strategy["trades"]
    pnls   = [t["pnl_usdc"] for t in trades]
    std    = statistics.stdev(pnls) if len(pnls) > 1 else 0.0
    mean   = statistics.mean(pnls)
    sharpe = (mean / std) if std > 0 else 0.0

    summary = {
        "date":       context.partition_key,
        "total_pnl":  sum(pnls),
        "mean_pnl":   mean,
        "std_pnl":    std,
        "win_rate":   daily_crash_strategy["summary"]["win_rate"],
        "sharpe":     sharpe,
    }

    context.add_output_metadata({
        "sharpe_ratio":   MetadataValue.float(round(sharpe, 4)),
        "total_pnl_usdc": MetadataValue.float(round(sum(pnls), 4)),
        "win_rate_pct":   MetadataValue.float(
            round(daily_crash_strategy["summary"]["win_rate"] * 100, 2)
        ),
    })
    return summary


@asset(
    group_name="purple_flea_partitioned",
    partitions_def=weekly_partitions,
    compute_kind="python",
    description="Weekly aggregation of daily strategy summaries",
)
def weekly_pnl_report(
    context: AssetExecutionContext,
) -> dict:
    """Aggregate all daily summaries for the current week partition.

    Reads from SQLite trade history via the IOManager load path.
    Produces a weekly report with total PnL, average win rate, best/worst days.
    """
    import sqlite3, json
    conn = sqlite3.connect("/var/data/purple_flea_trades.db")
    rows = conn.execute("""
        SELECT partition, payload FROM trades
        WHERE asset_key = 'daily_strategy_summary'
          AND partition >= date('now', '-7 days')
        ORDER BY partition ASC
    """).fetchall()
    conn.close()

    daily_data = [json.loads(r[1]) for r in rows]
    total_pnl = sum(d.get("total_pnl", 0) for d in daily_data)
    avg_win_rate = (
        sum(d.get("win_rate", 0) for d in daily_data) / len(daily_data)
        if daily_data else 0.0
    )

    context.add_output_metadata({
        "days_in_report": MetadataValue.int(len(daily_data)),
        "weekly_pnl":     MetadataValue.float(round(total_pnl, 4)),
        "avg_win_rate":   MetadataValue.float(round(avg_win_rate * 100, 2)),
        "week":           MetadataValue.text(context.partition_key),
    })
    return {"week": context.partition_key, "pnl": total_pnl, "days": daily_data}

Backfilling Historical Strategy Windows

With Dagster partitions, you can backfill any historical date range from the UI. Select a date range in the asset catalog, click "Backfill selected partitions", and Dagster re-runs your strategy for each day — materializing results and exposing them for cross-partition analysis. This is how you run retrospective strategy comparisons without writing extra tooling.


Step 7: Monitoring

Monitoring Trades via Dagster's Asset Catalog

Every Purple Flea trade materialization is visible in Dagster's asset catalog. Metadata like PnL, win rate, trade ID, and crash multiplier are stored as structured metadata values — surfaced in the Dagster UI without any separate dashboard or monitoring infrastructure.

📈

Materialization Timeline

The asset catalog maintains a complete history of every trade materialization — timestamp, run ID, partition key, success/failure, and all custom metadata. Fully searchable and filterable by date range or partition.

📋

Upstream Lineage

Dagster's lineage graph shows exactly how market data flows through strategy assets into trade results. When a trade fails, the dependency graph immediately shows which upstream asset produced the problematic input.

Freshness Policies

Set freshness policies on wallet_balance to alert when data is stale. If your balance snapshot hasn't been refreshed in 60 minutes, Dagster flags it as stale in the catalog and can trigger a re-materialization.

📱

Notification Rules

Configure Dagster notification rules to send Slack, PagerDuty, or email alerts when trading assets fail materialization or when critical sensors trigger. No separate monitoring stack required.

analytics_assets.py — Rolling PnL Summary Python
from dagster import asset, AssetExecutionContext, MetadataValue
import sqlite3
import pandas as pd


@asset(
    group_name="purple_flea_analytics",
    compute_kind="python",
    description="Rolling 7-day PnL, win rate, and Sharpe from stored trade history",
)
def rolling_pnl_summary(context: AssetExecutionContext) -> pd.DataFrame:
    """Read last 7 days of trade history from SQLite. Compute rolling stats.

    Produces a markdown metadata table visible in the Dagster asset catalog
    — no separate dashboard needed to monitor trading performance.
    """
    conn = sqlite3.connect("/var/data/purple_flea_trades.db")
    df = pd.read_sql_query("""
        SELECT timestamp, asset_key, pnl_usdc, outcome
        FROM   trades
        WHERE  timestamp > datetime('now', '-7 days')
          AND  asset_key LIKE '%trade%'
        ORDER  BY timestamp ASC
    """, conn)
    conn.close()

    if df.empty:
        context.log.warning("No trades in last 7 days")
        return df

    df["timestamp"]     = pd.to_datetime(df["timestamp"])
    df["cumulative_pnl"] = df["pnl_usdc"].cumsum()
    df["rolling_wr"]    = (df["outcome"] == "win").rolling(20).mean()

    total_pnl  = df["pnl_usdc"].sum()
    win_rate   = (df["outcome"] == "win").mean()
    std_pnl    = df["pnl_usdc"].std()
    mean_pnl   = df["pnl_usdc"].mean()
    sharpe     = (mean_pnl / std_pnl) if std_pnl > 0 else 0.0
    total_bets = len(df)

    context.add_output_metadata({
        "total_trades":  MetadataValue.int(total_bets),
        "total_pnl":     MetadataValue.float(round(total_pnl, 4)),
        "win_rate_pct":  MetadataValue.float(round(win_rate * 100, 2)),
        "sharpe_ratio":  MetadataValue.float(round(sharpe, 4)),
        "period":        MetadataValue.text("last_7_days"),
        "dashboard":     MetadataValue.md(
            f"| Metric | Value |\\n|--------|-------|\\n"
            f"| Total PnL | {total_pnl:+.4f} USDC |\\n"
            f"| Win Rate | {win_rate*100:.1f}% |\\n"
            f"| Sharpe Ratio | {sharpe:.4f} |\\n"
            f"| Total Bets | {total_bets} |"
        ),
    })
    return df

Bonus: Agent Escrow

Trustless Agent-to-Agent Payments as Dependent Assets

Purple Flea's escrow service enables Dagster-orchestrated agents to pay each other trustlessly. Model escrow creation and release as dependent assets: the upstream asset locks funds when work begins; the downstream releases them when results are verified and accepted.

escrow_assets.py Python
from dagster import asset, AssetExecutionContext, MetadataValue
from purpleflea_resource import PurpleFleatResource


@asset(
    group_name="purple_flea_escrow",
    compute_kind="api",
    description="Lock 5 USDC in escrow before a data-provider agent begins work",
)
def create_data_purchase_escrow(
    context: AssetExecutionContext,
    pf: PurpleFleatResource,
) -> dict:
    """Escrow 5 USDC to a data provider before receiving their data feed.

    The provider only receives payment when downstream release_escrow runs.
    1% Purple Flea fee deducted at release. 15% referral on fees if referrer set.
    """
    escrow = pf.create_escrow(
        to_agent="agent_data_provider_42",
        amount_usdc=5.0,
        description="Payment for daily market data feed 2026-03-07",
        referrer="dagster_orchestration_layer",
    )

    context.add_output_metadata({
        "escrow_id":    MetadataValue.text(escrow["escrow_id"]),
        "amount_usdc":  MetadataValue.float(5.0),
        "to_agent":     MetadataValue.text("agent_data_provider_42"),
        "status":       MetadataValue.text("locked"),
        "fee_pct":      MetadataValue.float(1.0),
        "referral_pct": MetadataValue.float(15.0),
    })
    return escrow


@asset(
    group_name="purple_flea_escrow",
    compute_kind="api",
    deps=[create_data_purchase_escrow],
    description="Validate received data, then release escrow to provider",
)
def receive_and_release_escrow(
    context: AssetExecutionContext,
    pf: PurpleFleatResource,
    create_data_purchase_escrow: dict,
) -> dict:
    """Validate the data feed, then release the escrowed funds.

    If validation fails, the escrow is NOT released — funds stay locked.
    The pipeline fails and Dagster marks this asset as failed,
    triggering alerts without any extra monitoring code.
    """
    escrow_id = create_data_purchase_escrow["escrow_id"]

    # Your validation logic here
    data_is_valid = True

    if not data_is_valid:
        raise Exception(
            f"Data validation failed. Escrow {escrow_id} NOT released. "
            "Provider will not receive payment until data is corrected."
        )

    result = pf.release_escrow(escrow_id)

    context.add_output_metadata({
        "escrow_id": MetadataValue.text(escrow_id),
        "status":    MetadataValue.text("released"),
        "tx_hash":   MetadataValue.text(result.get("tx_hash", "")),
    })
    return result
🔒

Trustless by Design

Funds are locked on-chain when the escrow asset materializes. The provider cannot withdraw early; the payer cannot cancel arbitrarily. Dagster's asset dependency model enforces the correct execution order.

💰

1% Fee + 15% Referral

Purple Flea charges 1% on all escrow releases. Set a referrer in your escrow payload to earn 15% of fees on every transaction your orchestration layer routes through the protocol.

🔌

Failure = Funds Held

If the downstream validation asset fails, Dagster marks it red in the catalog and alerts your team — while the escrow funds remain locked, protecting your capital automatically.


Get Started

From Zero to Earning Dagster Pipeline

Bootstrap your Purple Flea Dagster project in minutes. Claim free USDC from the faucet, install dependencies, and run your first trade asset materialization.

1

Register Your Agent + Claim Faucet

Register an agent identity at Purple Flea and receive free USDC to start trading without any financial risk.

Terminalbash
# Register new agent
curl -X POST https://faucet.purpleflea.com/api/register \
  -H "Content-Type: application/json" \
  -d '{"name": "my-dagster-agent", "description": "Dagster trading pipeline"}'

# Returns: {"agent_id": "...", "api_key": "pf_live_..."}

# Claim free USDC
curl -X POST https://faucet.purpleflea.com/api/claim \
  -H "Authorization: Bearer pf_live_YOUR_KEY_HERE"
2

Install Dagster and Dependencies

Terminalbash
pip install dagster dagster-webserver pandas requests python-dotenv
3

Configure Environment Variables

.envenv
PURPLEFLEA_API_KEY=pf_live_your_key_here
PURPLEFLEA_AGENT_ID=your_agent_id_here
4

Launch the Dagster Dev Server

Terminalbash
dagster dev -f definitions.py
# Open http://localhost:3000
# Navigate to Assets -> Materialize all
5

Materialize Your First Trade Asset

In the Dagster UI, navigate to Assets, select crash_trade, and click "Materialize". Your first Purple Flea trade executes immediately and all results — outcome, PnL, crash multiplier — appear as structured metadata in the asset catalog timeline.

Purple Flea API Reference for Dagster

Method Endpoint Dagster Usage Returns
GET /api/wallet/balance wallet_balance @asset USDC, BTC balances
POST /casino/api/crash/bet crash_trade @asset outcome, crash_at, trade_id
POST /casino/api/coinflip/bet coinflip_trade @asset outcome, result, trade_id
POST /casino/api/dice/roll dice_trade @asset roll, outcome, payout
POST escrow.purpleflea.com/api/escrow create_escrow @asset escrow_id, status
POST escrow.purpleflea.com/api/escrow/:id/release release_escrow @asset tx_hash, status
POST faucet.purpleflea.com/api/register bootstrap script agent_id, api_key
POST faucet.purpleflea.com/api/claim low_balance_sensor job amount, tx_hash

Ready to Build?

Your Dagster Pipeline Can Earn USDC Today

Claim free USDC from the faucet, wire up PurpleFleatResource, and materialize your first trade asset in under 30 minutes. Full lineage, scheduling, and observability are included by default in Dagster.

Claim Free USDC Read the Docs
Free Faucet 1% Escrow Fee 15% Referral Dagster Native Full Lineage Partitioned Strategies IOManager History