Transform your Dagster data pipelines into financial infrastructure. Model Purple Flea trades as materializable assets, monitor wallet balances with sensors, and store trade history with custom IOManagers — full lineage, reproducibility, and scheduling built in.
Dagster was designed for observable, reproducible data pipelines. When your pipeline outputs are USDC balances and trade receipts, that observability becomes critical. Every trade is a materialization event — auditable, retryable, and wired into Dagster's asset catalog automatically.
Every Purple Flea trade traces back to the upstream market data, wallet state, and strategy configuration that caused it. Dagster's lineage graph makes that dependency chain explicit, queryable, and visualized automatically in the UI.
Dagster assets are idempotent by design. Re-materializing a trade asset for a past partition re-runs the strategy against the same market conditions — perfect for backtesting failed agents and validating strategy changes.
Combine schedules, sensors, and manual triggers. Run your trading strategy on a cron schedule, react to wallet deposit events via sensors, or trigger immediately from the Dagster UI asset catalog.
View trade history, success/failure rates, materialization times, and metadata like PnL and position sizes directly in Dagster's asset catalog — no separate monitoring dashboard required.
PurpleFleatResource wraps all API key management and HTTP client configuration. Different environments (dev/staging/prod) get different resource instances — never risk mixing testnet and mainnet keys.
Daily, weekly, or custom partitions map perfectly onto trading windows. Each partition materializes an independent strategy run with its own USDC outcome tracked as first-class asset metadata.
The Purple Flea Dagster integration models financial operations as standard Dagster primitives. Your orchestration logic lives in Dagster; Purple Flea handles on-chain settlement and custody.
All Purple Flea API calls go through PurpleFleatResource, a
ConfigurableResource subclass. It handles authentication, retries,
and environment separation automatically so you never hardcode credentials.
from dagster import ConfigurableResource, get_dagster_logger import requests from typing import Optional, Dict, Any import time class PurpleFleatResource(ConfigurableResource): """Dagster ConfigurableResource for the Purple Flea financial API. Handles authentication, retries, and request logging for all Purple Flea service calls: casino, escrow, faucet, wallet, domains. Configure via environment variables: PURPLEFLEA_API_KEY=pf_live_... PURPLEFLEA_AGENT_ID=agent_... """ api_key: str # pf_live_... never sk_live_ agent_id: str # registered agent identifier base_url: str = "https://purpleflea.com/api" casino_url: str = "https://purpleflea.com/casino/api" escrow_url: str = "https://escrow.purpleflea.com/api" faucet_url: str = "https://faucet.purpleflea.com/api" timeout: int = 30 max_retries: int = 3 def _headers(self) -> Dict[str, str]: return { "Authorization": f"Bearer {self.api_key}", "X-Agent-ID": self.agent_id, "Content-Type": "application/json", } def _request(self, method: str, url: str, **kwargs) -> Dict: log = get_dagster_logger() for attempt in range(self.max_retries): try: resp = requests.request( method, url, headers=self._headers(), timeout=self.timeout, **kwargs ) resp.raise_for_status() return resp.json() except requests.RequestException as exc: log.warning(f"Attempt {attempt+1}/{self.max_retries} failed: {exc}") if attempt < self.max_retries - 1: time.sleep(2 ** attempt) # exponential backoff else: raise # Wallet def get_balance(self) -> Dict: return self._request("GET", f"{self.base_url}/wallet/balance") def get_transactions(self, limit: int = 50) -> Dict: return self._request("GET", f"{self.base_url}/wallet/transactions", params={"limit": limit}) # Casino def place_crash_bet(self, amount_usdc: float, auto_cashout: float) -> Dict: return self._request("POST", f"{self.casino_url}/crash/bet", json={"amount": amount_usdc, "auto_cashout": auto_cashout}) def place_coinflip(self, amount_usdc: float, side: str) -> Dict: return self._request("POST", f"{self.casino_url}/coinflip/bet", json={"amount": amount_usdc, "side": side}) def place_dice(self, amount_usdc: float, target: int, over: bool = True) -> Dict: return self._request("POST", f"{self.casino_url}/dice/roll", json={"amount": amount_usdc, "target": target, "over": over}) # Escrow def create_escrow(self, to_agent: str, amount_usdc: float, description: str, referrer: Optional[str] = None) -> Dict: payload = {"to": to_agent, "amount": amount_usdc, "description": description} if referrer: payload["referrer"] = referrer return self._request("POST", f"{self.escrow_url}/escrow", json=payload) def release_escrow(self, escrow_id: str) -> Dict: return self._request("POST", f"{self.escrow_url}/escrow/{escrow_id}/release") # Faucet def claim_faucet(self) -> Dict: return self._request("POST", f"{self.faucet_url}/claim") def register_agent(self, name: str, description: str) -> Dict: return self._request("POST", f"{self.faucet_url}/register", json={"name": name, "description": description})
Configure PurpleFleatResource via environment variables in your
dagster.yaml or deployment config. Use EnvVar("PURPLEFLEA_API_KEY")
to reference secrets without hardcoding. Different deployment targets (dev, staging, prod)
get different resource configurations automatically — no code changes needed.
In Dagster's asset model, a Purple Flea trade is a materialization event. The asset represents the trade result — outcome, PnL, fees — and its upstream dependencies are the market data and strategy config assets. Every run is fully logged in Dagster's event store.
from dagster import ( asset, AssetExecutionContext, MetadataValue, FreshnessPolicy, Output ) from purpleflea_resource import PurpleFleatResource import pandas as pd from datetime import datetime @asset( group_name="purple_flea", compute_kind="api", freshness_policy=FreshnessPolicy(maximum_lag_minutes=60), description="Current wallet balance snapshot from Purple Flea", ) def wallet_balance( context: AssetExecutionContext, pf: PurpleFleatResource, ) -> pd.DataFrame: """Materialized wallet balance snapshot. Stale after 60 minutes.""" balance_data = pf.get_balance() context.add_output_metadata({ "usdc_balance": MetadataValue.float(balance_data["usdc"]), "btc_balance": MetadataValue.float(balance_data.get("btc", 0.0)), "agent_id": MetadataValue.text(balance_data["agent_id"]), "snapshot_at": MetadataValue.text(datetime.utcnow().isoformat()), }) return pd.DataFrame([balance_data]) @asset( group_name="purple_flea", compute_kind="python", deps=[wallet_balance], description="Kelly-optimal bet sizing computed from current wallet balance", ) def kelly_bet_size( context: AssetExecutionContext, wallet_balance: pd.DataFrame, ) -> float: """Compute Kelly criterion bet size. Uses conservative 1/4 Kelly fraction.""" usdc = wallet_balance.iloc[0]["usdc"] edge = 0.02 # estimated 2% edge on crash at 1.5x odds = 1.0 # even-money equivalent full_kelly = edge / odds quarter_kelly = full_kelly / 4 bet = round(usdc * quarter_kelly, 2) bet = max(0.10, min(bet, 50.0)) # clamp $0.10 to $50 context.add_output_metadata({ "wallet_usdc": MetadataValue.float(usdc), "edge_estimate": MetadataValue.float(edge), "kelly_fraction": MetadataValue.float(quarter_kelly), "bet_size_usdc": MetadataValue.float(bet), }) return bet @asset( group_name="purple_flea", compute_kind="api", deps=[kelly_bet_size], description="Crash game bet — each materialization executes one bet", ) def crash_trade( context: AssetExecutionContext, pf: PurpleFleatResource, kelly_bet_size: float, ) -> dict: """Execute one crash bet as a Dagster asset materialization. The asset value is the trade receipt: outcome, multiplier, pnl. Re-materializing always executes a fresh bet — each materialization is an independent event logged in the Dagster asset catalog. """ auto_cashout = 1.5 result = pf.place_crash_bet( amount_usdc=kelly_bet_size, auto_cashout=auto_cashout ) won = result.get("outcome") == "win" pnl = kelly_bet_size * (auto_cashout - 1.0) if won else -kelly_bet_size context.add_output_metadata({ "bet_usdc": MetadataValue.float(kelly_bet_size), "auto_cashout": MetadataValue.float(auto_cashout), "crash_at": MetadataValue.float(result.get("crash_at", 0.0)), "outcome": MetadataValue.text(result.get("outcome", "unknown")), "pnl_usdc": MetadataValue.float(pnl), "trade_id": MetadataValue.text(result.get("trade_id", "")), "timestamp": MetadataValue.text(datetime.utcnow().isoformat()), }) context.log.info( f"Crash trade completed: bet={kelly_bet_size} " f"outcome={result.get('outcome')} pnl={pnl:.4f} USDC" ) return result @asset( group_name="purple_flea", compute_kind="api", description="Coin flip trade — 50/50 heads or tails bet", ) def coinflip_trade( context: AssetExecutionContext, pf: PurpleFleatResource, kelly_bet_size: float, ) -> dict: """Execute a coin flip bet using half the Kelly position size.""" run_id_last = context.run.run_id[-1] side = "heads" if int(run_id_last, 16) % 2 == 0 else "tails" result = pf.place_coinflip(amount_usdc=kelly_bet_size / 2, side=side) won = result.get("outcome") == "win" pnl = kelly_bet_size / 2 if won else -(kelly_bet_size / 2) context.add_output_metadata({ "side_chosen": MetadataValue.text(side), "result": MetadataValue.text(result.get("result", "")), "outcome": MetadataValue.text(result.get("outcome", "")), "pnl_usdc": MetadataValue.float(pnl), }) return result @asset( group_name="purple_flea", compute_kind="api", description="Dice roll trade — predict over/under on a target number", ) def dice_trade( context: AssetExecutionContext, pf: PurpleFleatResource, kelly_bet_size: float, ) -> dict: """Execute a dice roll bet targeting over 50 (approximately 49% win rate).""" target = 50 over = True bet_size = round(kelly_bet_size / 3, 2) # smaller position for dice result = pf.place_dice(amount_usdc=bet_size, target=target, over=over) won = result.get("outcome") == "win" payout_mult = result.get("payout_multiplier", 1.96) pnl = bet_size * (payout_mult - 1.0) if won else -bet_size context.add_output_metadata({ "target": MetadataValue.int(target), "over": MetadataValue.bool(over), "roll_result": MetadataValue.float(result.get("roll", 0.0)), "outcome": MetadataValue.text(result.get("outcome", "")), "pnl_usdc": MetadataValue.float(pnl), }) return result
@asset stores its output metadata in Dagster's event log.
Navigate to the asset in the Dagster UI, click "View materializations", and see every
trade's PnL, outcome, and timestamp in a searchable timeline — your complete trade history
at a glance.
Dagster sensors poll external systems and trigger runs when conditions are met. A wallet sensor polls your Purple Flea balance and triggers a trading pipeline precisely when a deposit arrives — event-driven execution with zero cron waste.
from dagster import ( sensor, SensorEvaluationContext, RunRequest, SkipReason, DefaultSensorStatus, asset_sensor, AssetKey, EventLogEntry ) from purpleflea_resource import PurpleFleatResource @sensor( job_name="trading_pipeline_job", minimum_interval_seconds=60, default_status=DefaultSensorStatus.RUNNING, description="Triggers a trading run when a wallet deposit is detected", ) def wallet_deposit_sensor( context: SensorEvaluationContext, pf: PurpleFleatResource, ): """Poll Purple Flea wallet balance every 60 seconds. Trigger a full trading pipeline when balance increases by >= 1 USDC. Cursor persistence: last known balance survives restarts. """ MIN_BALANCE = 10.0 # minimum USDC required to trade MIN_DEPOSIT = 1.0 # minimum deposit size to react to last_balance = float(context.cursor or 0.0) balance_data = pf.get_balance() current_balance = float(balance_data.get("usdc", 0.0)) deposit_detected = (current_balance - last_balance) >= MIN_DEPOSIT context.update_cursor(str(current_balance)) if current_balance < MIN_BALANCE: yield SkipReason( f"Balance {current_balance:.2f} USDC below minimum {MIN_BALANCE}" ) return if not deposit_detected: yield SkipReason( f"No deposit detected. Current balance: {current_balance:.2f} USDC" ) return deposit_amount = current_balance - last_balance context.log.info( f"Deposit detected: +{deposit_amount:.2f} USDC. Triggering trading run." ) yield RunRequest( run_key=f"deposit_{int(current_balance * 100)}_{context.cursor}", tags={ "triggered_by": "wallet_deposit_sensor", "deposit_usdc": str(round(deposit_amount, 4)), "balance_usdc": str(round(current_balance, 4)), } ) @sensor( job_name="low_balance_faucet_job", minimum_interval_seconds=300, description="Claims faucet automatically when balance drops below 1 USDC", ) def low_balance_sensor( context: SensorEvaluationContext, pf: PurpleFleatResource, ): """Monitor for critically low balance. Auto-claims faucet to stay funded.""" CRITICAL = 1.0 balance_data = pf.get_balance() current = float(balance_data.get("usdc", 0.0)) if current >= CRITICAL: yield SkipReason(f"Balance OK: {current:.4f} USDC") return last_alert = float(context.cursor or 999.0) if last_alert < CRITICAL: yield SkipReason("Already triggered faucet claim this episode") return context.update_cursor(str(current)) context.log.warning(f"CRITICAL: Balance is {current:.4f} USDC — triggering faucet") yield RunRequest( run_key=f"faucet_{int(current * 10000)}", tags={"alert": "low_balance", "balance": str(current)}, ) @asset_sensor( asset_key=AssetKey("crash_trade"), job_name="post_trade_analytics_job", description="Triggers analytics after each crash trade materialization", ) def post_trade_analytics_sensor( context: SensorEvaluationContext, asset_event: EventLogEntry, ): """React to crash_trade materializations. Run rolling PnL analytics.""" mat = asset_event.asset_materialization pnl = mat.metadata.get("pnl_usdc") trade_id = mat.metadata.get("trade_id") context.log.info(f"Crash trade completed. trade_id={trade_id} pnl={pnl}") yield RunRequest( run_key=f"analytics_{trade_id}", run_config={ "ops": { "rolling_pnl_summary": { "config": {"trigger_trade_id": str(trade_id)} } } }, )
Dagster automatically persists sensor cursors across process restarts, deployments, and scheduler interruptions. Your wallet balance checkpoint survives without any extra infrastructure or custom state management.
Deposit sensors eliminate the polling waste of cron-based triggers. Your trading pipeline runs exactly when capital is available, triggered by a wallet event — not by a fixed schedule that fires regardless of balance.
Dagster IOManagers control how asset values are serialized and stored between
pipeline steps. A custom TradeHistoryIOManager persists every
Purple Flea trade to a database, giving you a complete auditable trail of all
financial activity orchestrated by Dagster.
from dagster import ( ConfigurableIOManager, InputContext, OutputContext ) import sqlite3, json, os from datetime import datetime from typing import Any import pandas as pd class SQLiteTradeHistoryIOManager(ConfigurableIOManager): """Persists Purple Flea trade results in a local SQLite database. Provides a complete audit trail of all Dagster-orchestrated trades. Every asset materialization that produces a dict or DataFrame is persisted with asset key, partition, run ID, and timestamp. For production: extend for Postgres, BigQuery, or Snowflake. IOManager interface is identical regardless of backing store. """ db_path: str = "/var/data/purple_flea_trades.db" def _get_conn(self) -> sqlite3.Connection: os.makedirs(os.path.dirname(self.db_path), exist_ok=True) conn = sqlite3.connect(self.db_path) conn.execute(""" CREATE TABLE IF NOT EXISTS trades ( id INTEGER PRIMARY KEY AUTOINCREMENT, asset_key TEXT NOT NULL, partition TEXT, run_id TEXT NOT NULL, timestamp TEXT NOT NULL, payload TEXT NOT NULL, pnl_usdc REAL, outcome TEXT ) """) conn.commit() return conn def handle_output(self, context: OutputContext, obj: Any) -> None: """Called after each asset materializes — persist the trade result.""" if obj is None: return asset_key = ( "__".join(context.asset_key.path) if context.asset_key else "unknown" ) partition = context.partition_key or "none" run_id = context.run_id timestamp = datetime.utcnow().isoformat() if isinstance(obj, dict): payload = json.dumps(obj) pnl = obj.get("pnl_usdc") or obj.get("pnl") outcome = obj.get("outcome") elif isinstance(obj, pd.DataFrame): payload = obj.to_json(orient="records") pnl = None outcome = None else: payload = json.dumps({"value": str(obj)}) pnl = None outcome = None conn = self._get_conn() conn.execute( """INSERT INTO trades (asset_key, partition, run_id, timestamp, payload, pnl_usdc, outcome) VALUES (?, ?, ?, ?, ?, ?, ?)""", (asset_key, partition, run_id, timestamp, payload, pnl, outcome) ) conn.commit() conn.close() context.log.info( f"Persisted {asset_key} [partition={partition}] " f"to SQLite (pnl={pnl})" ) def load_input(self, context: InputContext) -> Any: """Load the most recent result for the upstream asset.""" asset_key = "__".join(context.upstream_output.asset_key.path) partition = context.partition_key or "none" conn = self._get_conn() row = conn.execute( """SELECT payload FROM trades WHERE asset_key = ? AND partition = ? ORDER BY id DESC LIMIT 1""", (asset_key, partition) ).fetchone() conn.close() if row is None: raise Exception( f"No stored trade found for {asset_key} partition={partition}" ) return json.loads(row[0]) def query_pnl_summary(self, days: int = 7) -> pd.DataFrame: """Utility: query rolling PnL stats for the last N days.""" conn = self._get_conn() df = pd.read_sql_query( """SELECT timestamp, asset_key, pnl_usdc, outcome FROM trades WHERE timestamp > datetime('now', ? || ' days') AND asset_key LIKE '%trade%' ORDER BY timestamp ASC""", conn, params=(f"-{days}",) ) conn.close() return df
PostgresTradeHistoryIOManager
using psycopg2 or SQLAlchemy. The Dagster IOManager interface is identical —
only the backing store implementation changes. All upstream and downstream asset wiring
remains unchanged.
Wire everything together in a single definitions.py file.
Dagster's Definitions object collects all assets, sensors,
schedules, and resources into a deployable, version-controlled unit.
from dagster import ( Definitions, EnvVar, load_assets_from_modules, define_asset_job, ScheduleDefinition, AssetSelection ) from purpleflea_resource import PurpleFleatResource from trade_io_manager import SQLiteTradeHistoryIOManager import trading_assets import wallet_sensors import partitioned_assets # Resources pf_resource = PurpleFleatResource( api_key=EnvVar("PURPLEFLEA_API_KEY"), # pf_live_... in .env agent_id=EnvVar("PURPLEFLEA_AGENT_ID"), ) io_mgr = SQLiteTradeHistoryIOManager( db_path="/var/data/purple_flea_trades.db" ) # Assets all_assets = load_assets_from_modules([trading_assets, partitioned_assets]) # Jobs trading_pipeline_job = define_asset_job( name="trading_pipeline_job", selection=AssetSelection.groups("purple_flea"), description="Full pipeline: balance snapshot → kelly → crash + coinflip + dice", ) daily_strategy_job = define_asset_job( name="daily_strategy_job", selection=AssetSelection.assets("daily_strategy_summary"), description="Partitioned daily trading strategy", partitions_def=partitioned_assets.daily_partitions, ) low_balance_faucet_job = define_asset_job( name="low_balance_faucet_job", selection=AssetSelection.assets("faucet_claim"), description="Triggered by low_balance_sensor — auto-claims faucet", ) post_trade_analytics_job = define_asset_job( name="post_trade_analytics_job", selection=AssetSelection.assets("rolling_pnl_summary"), description="Rolling PnL and win rate analytics after each trade", ) # Schedules hourly_trading_schedule = ScheduleDefinition( job=trading_pipeline_job, cron_schedule="0 * * * *", name="hourly_trading_schedule", execution_timezone="UTC", ) daily_strategy_schedule = ScheduleDefinition( job=daily_strategy_job, cron_schedule="0 0 * * *", name="daily_strategy_schedule", execution_timezone="UTC", ) # Definitions (the deployable unit) defs = Definitions( assets=all_assets, jobs=[ trading_pipeline_job, daily_strategy_job, low_balance_faucet_job, post_trade_analytics_job, ], schedules=[ hourly_trading_schedule, daily_strategy_schedule, ], sensors=[ wallet_sensors.wallet_deposit_sensor, wallet_sensors.low_balance_sensor, wallet_sensors.post_trade_analytics_sensor, ], resources={ "pf": pf_resource, "io_manager": io_mgr, }, )
Dagster daily partitions map directly onto trading windows. Each partition runs an independent strategy instance, accumulating its own trade history and PnL summary. Backfill any date range to re-run historical strategies.
from dagster import ( asset, DailyPartitionsDefinition, WeeklyPartitionsDefinition, AssetExecutionContext, MetadataValue, StaticPartitionsDefinition ) from purpleflea_resource import PurpleFleatResource import statistics # Partition definitions daily_partitions = DailyPartitionsDefinition( start_date="2026-01-01", timezone="UTC", ) weekly_partitions = WeeklyPartitionsDefinition( start_date="2026-01-06", timezone="UTC", ) strategy_partitions = StaticPartitionsDefinition([ "conservative", # 1/8 Kelly, 1.2x auto-cashout "balanced", # 1/4 Kelly, 1.5x auto-cashout "aggressive", # 1/2 Kelly, 2.0x auto-cashout ]) STRATEGY_PARAMS = { "conservative": {"kelly_frac": 0.125, "cashout": 1.2, "bets": 5}, "balanced": {"kelly_frac": 0.25, "cashout": 1.5, "bets": 10}, "aggressive": {"kelly_frac": 0.5, "cashout": 2.0, "bets": 20}, } @asset( group_name="purple_flea_partitioned", partitions_def=daily_partitions, compute_kind="api", description="Daily crash strategy: 10 bets with balanced Kelly params", ) def daily_crash_strategy( context: AssetExecutionContext, pf: PurpleFleatResource, ) -> dict: """Execute the daily crash trading strategy for a given date partition. Each date partition runs 10 crash bets with 1/4 Kelly at 1.5x auto-cashout. Results accumulate in the IOManager for cross-partition analytics. Re-materialize any past partition to replay that day's strategy. """ partition_date = context.partition_key NUM_BETS = 10 BET_USDC = 1.0 AUTO_CASHOUT = 1.5 context.log.info(f"Running daily crash strategy for {partition_date} ({NUM_BETS} bets)") results = [] total_pnl = 0.0 wins = 0 for i in range(NUM_BETS): result = pf.place_crash_bet(amount_usdc=BET_USDC, auto_cashout=AUTO_CASHOUT) won = result.get("outcome") == "win" pnl = BET_USDC * (AUTO_CASHOUT - 1.0) if won else -BET_USDC total_pnl += pnl wins += int(won) result["pnl_usdc"] = pnl results.append(result) win_rate = wins / NUM_BETS context.add_output_metadata({ "partition_date": MetadataValue.text(partition_date), "num_bets": MetadataValue.int(NUM_BETS), "wins": MetadataValue.int(wins), "losses": MetadataValue.int(NUM_BETS - wins), "win_rate": MetadataValue.float(win_rate), "total_pnl_usdc": MetadataValue.float(total_pnl), "auto_cashout": MetadataValue.float(AUTO_CASHOUT), }) return { "partition_date": partition_date, "trades": results, "summary": { "wins": wins, "losses": NUM_BETS - wins, "win_rate": win_rate, "total_pnl": total_pnl, }, } @asset( group_name="purple_flea_partitioned", partitions_def=daily_partitions, deps=[daily_crash_strategy], compute_kind="python", description="Statistical summary over a single day's crash trades", ) def daily_strategy_summary( context: AssetExecutionContext, daily_crash_strategy: dict, ) -> dict: """Compute per-day statistics. Stored for weekly aggregation assets.""" trades = daily_crash_strategy["trades"] pnls = [t["pnl_usdc"] for t in trades] std = statistics.stdev(pnls) if len(pnls) > 1 else 0.0 mean = statistics.mean(pnls) sharpe = (mean / std) if std > 0 else 0.0 summary = { "date": context.partition_key, "total_pnl": sum(pnls), "mean_pnl": mean, "std_pnl": std, "win_rate": daily_crash_strategy["summary"]["win_rate"], "sharpe": sharpe, } context.add_output_metadata({ "sharpe_ratio": MetadataValue.float(round(sharpe, 4)), "total_pnl_usdc": MetadataValue.float(round(sum(pnls), 4)), "win_rate_pct": MetadataValue.float( round(daily_crash_strategy["summary"]["win_rate"] * 100, 2) ), }) return summary @asset( group_name="purple_flea_partitioned", partitions_def=weekly_partitions, compute_kind="python", description="Weekly aggregation of daily strategy summaries", ) def weekly_pnl_report( context: AssetExecutionContext, ) -> dict: """Aggregate all daily summaries for the current week partition. Reads from SQLite trade history via the IOManager load path. Produces a weekly report with total PnL, average win rate, best/worst days. """ import sqlite3, json conn = sqlite3.connect("/var/data/purple_flea_trades.db") rows = conn.execute(""" SELECT partition, payload FROM trades WHERE asset_key = 'daily_strategy_summary' AND partition >= date('now', '-7 days') ORDER BY partition ASC """).fetchall() conn.close() daily_data = [json.loads(r[1]) for r in rows] total_pnl = sum(d.get("total_pnl", 0) for d in daily_data) avg_win_rate = ( sum(d.get("win_rate", 0) for d in daily_data) / len(daily_data) if daily_data else 0.0 ) context.add_output_metadata({ "days_in_report": MetadataValue.int(len(daily_data)), "weekly_pnl": MetadataValue.float(round(total_pnl, 4)), "avg_win_rate": MetadataValue.float(round(avg_win_rate * 100, 2)), "week": MetadataValue.text(context.partition_key), }) return {"week": context.partition_key, "pnl": total_pnl, "days": daily_data}
With Dagster partitions, you can backfill any historical date range from the UI. Select a date range in the asset catalog, click "Backfill selected partitions", and Dagster re-runs your strategy for each day — materializing results and exposing them for cross-partition analysis. This is how you run retrospective strategy comparisons without writing extra tooling.
Every Purple Flea trade materialization is visible in Dagster's asset catalog. Metadata like PnL, win rate, trade ID, and crash multiplier are stored as structured metadata values — surfaced in the Dagster UI without any separate dashboard or monitoring infrastructure.
The asset catalog maintains a complete history of every trade materialization — timestamp, run ID, partition key, success/failure, and all custom metadata. Fully searchable and filterable by date range or partition.
Dagster's lineage graph shows exactly how market data flows through strategy assets into trade results. When a trade fails, the dependency graph immediately shows which upstream asset produced the problematic input.
Set freshness policies on wallet_balance to alert when data is stale. If your balance snapshot hasn't been refreshed in 60 minutes, Dagster flags it as stale in the catalog and can trigger a re-materialization.
Configure Dagster notification rules to send Slack, PagerDuty, or email alerts when trading assets fail materialization or when critical sensors trigger. No separate monitoring stack required.
from dagster import asset, AssetExecutionContext, MetadataValue import sqlite3 import pandas as pd @asset( group_name="purple_flea_analytics", compute_kind="python", description="Rolling 7-day PnL, win rate, and Sharpe from stored trade history", ) def rolling_pnl_summary(context: AssetExecutionContext) -> pd.DataFrame: """Read last 7 days of trade history from SQLite. Compute rolling stats. Produces a markdown metadata table visible in the Dagster asset catalog — no separate dashboard needed to monitor trading performance. """ conn = sqlite3.connect("/var/data/purple_flea_trades.db") df = pd.read_sql_query(""" SELECT timestamp, asset_key, pnl_usdc, outcome FROM trades WHERE timestamp > datetime('now', '-7 days') AND asset_key LIKE '%trade%' ORDER BY timestamp ASC """, conn) conn.close() if df.empty: context.log.warning("No trades in last 7 days") return df df["timestamp"] = pd.to_datetime(df["timestamp"]) df["cumulative_pnl"] = df["pnl_usdc"].cumsum() df["rolling_wr"] = (df["outcome"] == "win").rolling(20).mean() total_pnl = df["pnl_usdc"].sum() win_rate = (df["outcome"] == "win").mean() std_pnl = df["pnl_usdc"].std() mean_pnl = df["pnl_usdc"].mean() sharpe = (mean_pnl / std_pnl) if std_pnl > 0 else 0.0 total_bets = len(df) context.add_output_metadata({ "total_trades": MetadataValue.int(total_bets), "total_pnl": MetadataValue.float(round(total_pnl, 4)), "win_rate_pct": MetadataValue.float(round(win_rate * 100, 2)), "sharpe_ratio": MetadataValue.float(round(sharpe, 4)), "period": MetadataValue.text("last_7_days"), "dashboard": MetadataValue.md( f"| Metric | Value |\\n|--------|-------|\\n" f"| Total PnL | {total_pnl:+.4f} USDC |\\n" f"| Win Rate | {win_rate*100:.1f}% |\\n" f"| Sharpe Ratio | {sharpe:.4f} |\\n" f"| Total Bets | {total_bets} |" ), }) return df
Purple Flea's escrow service enables Dagster-orchestrated agents to pay each other trustlessly. Model escrow creation and release as dependent assets: the upstream asset locks funds when work begins; the downstream releases them when results are verified and accepted.
from dagster import asset, AssetExecutionContext, MetadataValue from purpleflea_resource import PurpleFleatResource @asset( group_name="purple_flea_escrow", compute_kind="api", description="Lock 5 USDC in escrow before a data-provider agent begins work", ) def create_data_purchase_escrow( context: AssetExecutionContext, pf: PurpleFleatResource, ) -> dict: """Escrow 5 USDC to a data provider before receiving their data feed. The provider only receives payment when downstream release_escrow runs. 1% Purple Flea fee deducted at release. 15% referral on fees if referrer set. """ escrow = pf.create_escrow( to_agent="agent_data_provider_42", amount_usdc=5.0, description="Payment for daily market data feed 2026-03-07", referrer="dagster_orchestration_layer", ) context.add_output_metadata({ "escrow_id": MetadataValue.text(escrow["escrow_id"]), "amount_usdc": MetadataValue.float(5.0), "to_agent": MetadataValue.text("agent_data_provider_42"), "status": MetadataValue.text("locked"), "fee_pct": MetadataValue.float(1.0), "referral_pct": MetadataValue.float(15.0), }) return escrow @asset( group_name="purple_flea_escrow", compute_kind="api", deps=[create_data_purchase_escrow], description="Validate received data, then release escrow to provider", ) def receive_and_release_escrow( context: AssetExecutionContext, pf: PurpleFleatResource, create_data_purchase_escrow: dict, ) -> dict: """Validate the data feed, then release the escrowed funds. If validation fails, the escrow is NOT released — funds stay locked. The pipeline fails and Dagster marks this asset as failed, triggering alerts without any extra monitoring code. """ escrow_id = create_data_purchase_escrow["escrow_id"] # Your validation logic here data_is_valid = True if not data_is_valid: raise Exception( f"Data validation failed. Escrow {escrow_id} NOT released. " "Provider will not receive payment until data is corrected." ) result = pf.release_escrow(escrow_id) context.add_output_metadata({ "escrow_id": MetadataValue.text(escrow_id), "status": MetadataValue.text("released"), "tx_hash": MetadataValue.text(result.get("tx_hash", "")), }) return result
Funds are locked on-chain when the escrow asset materializes. The provider cannot withdraw early; the payer cannot cancel arbitrarily. Dagster's asset dependency model enforces the correct execution order.
Purple Flea charges 1% on all escrow releases. Set a referrer in your escrow payload to earn 15% of fees on every transaction your orchestration layer routes through the protocol.
If the downstream validation asset fails, Dagster marks it red in the catalog and alerts your team — while the escrow funds remain locked, protecting your capital automatically.
Bootstrap your Purple Flea Dagster project in minutes. Claim free USDC from the faucet, install dependencies, and run your first trade asset materialization.
Register an agent identity at Purple Flea and receive free USDC to start trading without any financial risk.
# Register new agent curl -X POST https://faucet.purpleflea.com/api/register \ -H "Content-Type: application/json" \ -d '{"name": "my-dagster-agent", "description": "Dagster trading pipeline"}' # Returns: {"agent_id": "...", "api_key": "pf_live_..."} # Claim free USDC curl -X POST https://faucet.purpleflea.com/api/claim \ -H "Authorization: Bearer pf_live_YOUR_KEY_HERE"
pip install dagster dagster-webserver pandas requests python-dotenv
PURPLEFLEA_API_KEY=pf_live_your_key_here PURPLEFLEA_AGENT_ID=your_agent_id_here
dagster dev -f definitions.py # Open http://localhost:3000 # Navigate to Assets -> Materialize all
In the Dagster UI, navigate to Assets, select crash_trade, and click "Materialize". Your first Purple Flea trade executes immediately and all results — outcome, PnL, crash multiplier — appear as structured metadata in the asset catalog timeline.
| Method | Endpoint | Dagster Usage | Returns |
|---|---|---|---|
| GET | /api/wallet/balance | wallet_balance @asset | USDC, BTC balances |
| POST | /casino/api/crash/bet | crash_trade @asset | outcome, crash_at, trade_id |
| POST | /casino/api/coinflip/bet | coinflip_trade @asset | outcome, result, trade_id |
| POST | /casino/api/dice/roll | dice_trade @asset | roll, outcome, payout |
| POST | escrow.purpleflea.com/api/escrow | create_escrow @asset | escrow_id, status |
| POST | escrow.purpleflea.com/api/escrow/:id/release | release_escrow @asset | tx_hash, status |
| POST | faucet.purpleflea.com/api/register | bootstrap script | agent_id, api_key |
| POST | faucet.purpleflea.com/api/claim | low_balance_sensor job | amount, tx_hash |
Claim free USDC from the faucet, wire up PurpleFleatResource,
and materialize your first trade asset in under 30 minutes. Full lineage,
scheduling, and observability are included by default in Dagster.