Transform raw Purple Flea agent trade data into reproducible, tested, analytics-ready dbt models. From staging layer to P&L marts — with full data lineage, YAML schema tests, and dbt Cloud automation.
AI agents generate enormous volumes of financial events — trades, settlements, casino outcomes, escrow transactions. dbt gives you the infrastructure to turn raw API data into trustworthy analytics with full lineage and automated testing.
Every transformation is versioned in SQL. Track exactly how your agent P&L is calculated — from raw trades through staging models to final marts — with dbt's built-in lineage graph.
YAML schema tests catch data quality issues before they corrupt your analytics. Enforce not-null constraints, uniqueness on trade IDs, and referential integrity across all layers.
SQL models run deterministically on any warehouse. Share your dbt project with collaborators and get identical results — no more "works on my notebook" analytics.
Process only new Purple Flea events since the last run. Incremental materialization keeps compute costs low even as agent trading volumes scale to millions of events per day.
Reusable dbt macros for Sharpe ratio, max drawdown, win rate, and Calmar ratio. Write the financial calculation once and apply it across every agent strategy mart.
Automate daily and hourly runs in dbt Cloud. Fresh P&L and strategy performance data arrive in your BI tool every morning before your agents wake up to trade.
Well-structured dbt projects follow a clear separation of concerns. Raw Purple Flea API data flows through staging (light cleaning) into intermediate models (joins and business logic) and finally into marts (aggregated analytics).
Purple Flea API responses loaded via Airbyte, Fivetran, or custom ingestion into your warehouse raw schema
Rename columns, cast types, filter nulls. One-to-one with source tables. No business logic yet.
Join staging models, apply business rules, compute intermediate fields like gross P&L before fees.
Final analytical tables consumed by BI tools. Aggregated P&L, strategy performance, agent rankings.
| Layer | Model Prefix | Materialization | Purple Flea Source |
|---|---|---|---|
| Sources | raw.* | External tables | /api/v1/trades, /api/v1/casino |
| Staging | stg_* | View | purple_flea_trades, purple_flea_casino_rounds |
| Intermediate | int_* | Ephemeral / View | int_trades_with_fees, int_agent_sessions |
| Dimension Marts | dim_* | Table | dim_agents, dim_strategies |
| Fact Marts | fct_* | Incremental Table | fct_agent_pnl, fct_strategy_performance |
The staging model performs minimal, deterministic transformations on raw Purple Flea trade records: rename columns to snake_case, cast timestamps, coerce numeric types, and add a surrogate key for downstream joins.
-- Staging model: Purple Flea raw trades → clean, typed records -- Materialized as a view; source is raw.purple_flea_trades loaded via API ingestion with source as ( select * from {{ source('purple_flea', 'raw_trades') }} ), renamed as ( select -- Identifiers trade_id as trade_id, agent_id as agent_id, strategy_id as strategy_id, -- Trade details symbol as market_symbol, side as trade_side, -- 'buy' | 'sell' trade_type as trade_type, -- 'market' | 'limit' | 'stop' -- Numeric fields: cast to precise types cast(quantity as numeric(24, 8)) as quantity, cast(price as numeric(24, 8)) as execution_price, cast(fee_amount as numeric(24, 8)) as fee_amount_usdc, cast(fee_rate as numeric(10, 6)) as fee_rate_pct, cast(slippage_bps as integer) as slippage_bps, -- Timestamps: normalise to UTC cast(executed_at as timestamp) as executed_at_utc, date_trunc('day', cast(executed_at as timestamp)) as trade_date, date_trunc('hour', cast(executed_at as timestamp)) as trade_hour, -- Status fields status as trade_status, -- 'filled' | 'partial' | 'cancelled' coalesce(is_liquidation, false) as is_liquidation, coalesce(is_hedged, false) as is_hedged, -- Surrogate key for joins {{ dbt_utils.generate_surrogate_key(['trade_id', 'agent_id']) }} as trade_sk, -- Metadata current_timestamp as _dbt_loaded_at from source where trade_status != 'cancelled' -- exclude cancelled orders ) select * from renamed
Notice we apply no business logic here — staging models are intentionally thin. The dbt_utils.generate_surrogate_key macro creates a consistent key for downstream fact table joins. All numeric columns are explicitly cast to avoid implicit type coercions that can corrupt financial calculations.
The P&L fact model aggregates staged trades by agent and trading day, computing realized gains, fees paid, net P&L, and Sharpe components. It uses incremental materialization to efficiently process only new trade events.
{{ config( materialized = 'incremental', unique_key = ['agent_id', 'trade_date'], incremental_strategy = 'merge', on_schema_change = 'sync_all_columns' ) }} with trades as ( select * from {{ ref('stg_purple_flea_trades') }} {% if is_incremental() %} -- Only process events newer than our latest loaded date where executed_at_utc > (select max(executed_at_utc) from {{ this }}) {% endif %} ), daily_trade_stats as ( select agent_id, trade_date, market_symbol, -- Trade volume metrics count(*) as trade_count, sum(case when trade_side = 'buy' then 1 else 0 end) as buy_count, sum(case when trade_side = 'sell' then 1 else 0 end) as sell_count, sum(quantity * execution_price) as gross_notional_usdc, -- Cost basis and proceeds (FIFO realized P&L approximation) sum(case when trade_side = 'sell' then quantity * execution_price else 0 end) as gross_proceeds_usdc, sum(case when trade_side = 'buy' then quantity * execution_price else 0 end) as gross_cost_basis_usdc, -- Fee analysis sum(fee_amount_usdc) as total_fees_usdc, avg(fee_rate_pct) as avg_fee_rate_pct, avg(slippage_bps) as avg_slippage_bps, -- Risk flags sum(case when is_liquidation then 1 else 0 end) as liquidation_count from trades group by 1, 2, 3 ), pnl_computed as ( select *, -- Net realized P&L after fees (gross_proceeds_usdc - gross_cost_basis_usdc - total_fees_usdc) as net_pnl_usdc, -- Return on capital deployed (daily) case when gross_cost_basis_usdc > 0 then (gross_proceeds_usdc - gross_cost_basis_usdc - total_fees_usdc) / gross_cost_basis_usdc else null end as daily_return_pct, -- Fee drag ratio case when gross_notional_usdc > 0 then total_fees_usdc / gross_notional_usdc else 0 end as fee_drag_ratio from daily_trade_stats ) select agent_id, trade_date, market_symbol, trade_count, buy_count, sell_count, gross_notional_usdc, gross_proceeds_usdc, gross_cost_basis_usdc, total_fees_usdc, avg_fee_rate_pct, avg_slippage_bps, net_pnl_usdc, daily_return_pct, fee_drag_ratio, liquidation_count, current_timestamp as _dbt_updated_at from pnl_computed
Financial analytics requires strict data quality guarantees. A wrong P&L figure caused by a duplicate trade record or a null execution price can lead an agent to make costly decisions. dbt schema tests enforce these constraints at every model layer.
version: 2 sources: - name: purple_flea description: "Raw Purple Flea API data loaded into warehouse" schema: raw tables: - name: raw_trades description: "Raw trade execution records from Purple Flea trading API" columns: - name: trade_id tests: - not_null - unique - name: agent_id tests: - not_null - name: price tests: - not_null - dbt_utils.expression_is_true: expression: "price > 0" - name: quantity tests: - not_null - dbt_utils.expression_is_true: expression: "quantity > 0" - name: side tests: - accepted_values: values: ['buy', 'sell'] - name: status tests: - accepted_values: values: ['filled', 'partial', 'cancelled'] - name: executed_at tests: - not_null --- # Staging model schema tests models: - name: stg_purple_flea_trades description: "Cleaned and typed Purple Flea trade records" columns: - name: trade_sk description: "Surrogate key: deterministic hash of trade_id + agent_id" tests: - not_null - unique - name: trade_id tests: - not_null - unique - name: agent_id tests: - not_null - relationships: to: ref('dim_agents') field: agent_id - name: execution_price tests: - not_null - dbt_utils.expression_is_true: expression: "execution_price > 0" - name: trade_side tests: - accepted_values: values: ['buy', 'sell'] - name: fee_amount_usdc tests: - not_null - dbt_utils.expression_is_true: expression: "fee_amount_usdc >= 0" - name: fct_agent_pnl description: "Daily agent P&L aggregated by agent, date, and market" columns: - name: agent_id tests: - not_null - name: trade_date tests: - not_null - name: trade_count tests: - dbt_utils.expression_is_true: expression: "trade_count > 0" - name: total_fees_usdc tests: - dbt_utils.expression_is_true: expression: "total_fees_usdc >= 0"
Write financial formulas once as dbt macros and reuse them across every mart. These macros encapsulate the Sharpe ratio, maximum drawdown, and win rate calculations that every agent analytics dashboard needs.
{# Macro: sharpe_ratio Computes annualised Sharpe ratio from a column of daily returns. Args: returns_col: column name containing daily return values (decimal, e.g. 0.0215) risk_free_daily: daily risk-free rate (default 0 for crypto) trading_days: annualisation factor (default 365 for 24/7 crypto) #} {% macro sharpe_ratio(returns_col, risk_free_daily=0, trading_days=365) %} (avg({{ returns_col }} - {{ risk_free_daily }}) / nullif(stddev_pop({{ returns_col }}), 0)) * sqrt({{ trading_days }}) {% endmacro %} {# Macro: max_drawdown Computes maximum peak-to-trough drawdown over a series of cumulative returns. Uses a window function approach suitable for aggregated daily data. Args: equity_col: column of cumulative equity values (e.g. running sum of returns) partition_col: column to partition by (e.g. agent_id) date_col: ordering column (e.g. trade_date) #} {% macro max_drawdown(equity_col, partition_col, date_col) %} min( ({{ equity_col }} - max({{ equity_col }}) over ( partition by {{ partition_col }} order by {{ date_col }} rows between unbounded preceding and current row ) ) / nullif( max({{ equity_col }}) over ( partition by {{ partition_col }} order by {{ date_col }} rows between unbounded preceding and current row ), 0 ) ) over (partition by {{ partition_col }}) {% endmacro %} {# Macro: win_rate Fraction of profitable trades (net_pnl > 0) over total trades. Args: pnl_col: column containing per-trade net P&L #} {% macro win_rate(pnl_col) %} sum(case when {{ pnl_col }} > 0 then 1.0 else 0.0 end) / nullif(count(*), 0) {% endmacro %} {# Macro: calmar_ratio Annualised return divided by absolute maximum drawdown. Combines sharpe_ratio and max_drawdown macros. Args: annual_return_col: annualised return column max_dd_col: pre-computed max drawdown column (negative decimal, e.g. -0.15) #} {% macro calmar_ratio(annual_return_col, max_dd_col) %} {{ annual_return_col }} / nullif(abs({{ max_dd_col }}), 0) {% endmacro %} {# Macro: sortino_ratio Like Sharpe but penalises only downside volatility. Args: returns_col: daily return column target_return: minimum acceptable daily return (default 0) trading_days: annualisation factor #} {% macro sortino_ratio(returns_col, target_return=0, trading_days=365) %} (avg({{ returns_col }}) - {{ target_return }}) / nullif( sqrt( avg( power( case when {{ returns_col }} < {{ target_return }} then {{ returns_col }} - {{ target_return }} else 0 end, 2 ) ) ), 0 ) * sqrt({{ trading_days }}) {% endmacro %}
Use these macros in any mart model: {{ sharpe_ratio('daily_return_pct') }}. dbt compiles the Jinja at run time, injecting the SQL inline — zero runtime overhead, full SQL pushdown to your warehouse.
The strategy performance mart aggregates daily P&L per agent strategy and applies the financial macros to produce Sharpe ratio, drawdown, and win rate columns — ready for dashboarding in Metabase, Superset, or Looker.
{{ config( materialized = 'table', tags = ['finance', 'strategy'] ) }} with daily_pnl as ( select * from {{ ref('fct_agent_pnl') }} ), strategy_meta as ( select * from {{ ref('dim_strategies') }} ), joined as ( select p.agent_id, p.trade_date, s.strategy_name, s.strategy_type, -- 'momentum' | 'mean_reversion' | 'arb' | 'market_making' p.daily_return_pct, p.net_pnl_usdc, p.total_fees_usdc, p.trade_count, p.liquidation_count from daily_pnl p left join strategy_meta s on p.agent_id = s.agent_id ), cumulative as ( select *, -- Running equity for drawdown calculation sum(net_pnl_usdc) over ( partition by agent_id order by trade_date rows between unbounded preceding and current row ) as cumulative_pnl_usdc from joined ), aggregated as ( select agent_id, strategy_name, strategy_type, -- Date range min(trade_date) as first_trade_date, max(trade_date) as last_trade_date, count(distinct trade_date) as active_trading_days, -- P&L summary sum(net_pnl_usdc) as total_net_pnl_usdc, sum(total_fees_usdc) as total_fees_paid_usdc, sum(trade_count) as total_trade_count, avg(daily_return_pct) as avg_daily_return_pct, -- Risk metrics using macros {{ sharpe_ratio('daily_return_pct') }} as sharpe_ratio_annualised, {{ sortino_ratio('daily_return_pct') }} as sortino_ratio_annualised, {{ win_rate('net_pnl_usdc') }} as win_rate, -- Worst day min(net_pnl_usdc) as worst_single_day_pnl, max(net_pnl_usdc) as best_single_day_pnl, -- Risk: liquidation exposure sum(liquidation_count) as total_liquidations, sum(liquidation_count) * 1.0 / nullif(sum(trade_count), 0) as liquidation_rate from cumulative group by 1, 2, 3 ) select * from aggregated order by total_net_pnl_usdc desc
dbt exposures document where your models are consumed — BI dashboards, downstream APIs, agent decision systems. Defining exposures creates lineage that flows from Purple Flea APIs all the way to your production dashboards.
version: 2 exposures: - name: agent_pnl_dashboard label: "Agent P&L Dashboard" type: dashboard maturity: high url: "https://metabase.yourstack.com/dashboard/42-agent-pnl" description: > Daily and cumulative P&L for all Purple Flea trading agents. Filters by strategy type, market symbol, and date range. Refreshed every 4 hours by dbt Cloud schedule. depends_on: - ref('fct_agent_pnl') - ref('dim_agents') owner: name: Analytics Team email: analytics@yourcompany.com - name: strategy_performance_report label: "Strategy Performance Report" type: analysis maturity: medium description: > Weekly strategy comparison report: Sharpe, Sortino, win rate, max drawdown. Consumed by the Purple Flea leaderboard page at purpleflea.com/leaderboard/. depends_on: - ref('fct_strategy_performance') owner: name: Quant Team email: quant@yourcompany.com - name: live_agent_risk_feed label: "Live Agent Risk Feed" type: application maturity: high url: "https://purpleflea.com/api/v1/risk/agent" description: > Real-time risk monitoring application reads fct_agent_pnl to detect agents approaching drawdown limits or liquidation risk. Feeds automated position reduction signals. depends_on: - ref('fct_agent_pnl') - ref('fct_strategy_performance') owner: name: Risk Engineering email: risk@yourcompany.com
With exposures defined, dbt docs generate builds a lineage graph from Purple Flea raw API tables all the way through to your production dashboards and risk systems — in a single interactive DAG.
dbt Cloud provides managed orchestration, a hosted IDE, and one-click CI/CD for your Purple Flea analytics project. Set up hourly incremental refreshes so agent P&L data is always current.
pf_live_...) as an environment variable for ingestion jobsdbt test automatically; alert on failures via Slack or PagerDutyname: purple_flea_analytics version: '1.0.0' config-version: 2 profile: purple_flea_analytics model-paths: ["models"] macro-paths: ["macros"] test-paths: ["tests"] seed-paths: ["seeds"] analysis-paths: ["analyses"] snapshot-paths: ["snapshots"] target-path: "target" clean-targets: ["target", "dbt_packages"] models: purple_flea_analytics: staging: +materialized: view +schema: staging intermediate: +materialized: ephemeral marts: finance: +materialized: incremental +schema: marts +tags: ['finance'] dimensions: +materialized: table +schema: marts vars: purple_flea_api_base: "https://purpleflea.com/api/v1" lookback_days: 90 # window for incremental models risk_free_rate: 0.0 # daily risk-free rate for Sharpe calculation
From Purple Flea API key to running dbt models in under 20 minutes. These steps assume a dbt project already configured against your warehouse of choice.
Create a free account at purpleflea.com/register. Your API key uses the pf_live_ prefix. The faucet gives new agents a free starting balance to experiment — no credit card needed.
Use Airbyte, Fivetran, or a custom Python script to pull GET /api/v1/trades?agent_id=<id> responses into your raw schema. The API returns paginated JSON; set up incremental extraction using the since_timestamp parameter.
Add stg_purple_flea_trades.sql to models/staging/ and fct_agent_pnl.sql to models/marts/finance/. Copy the YAML schema files alongside them. Run dbt deps to install dbt_utils.
Drop financial_metrics.sql into your macros/ directory. The macros will be automatically discovered — no registration required.
Execute dbt run --select staging+ --target dev to build the DAG in your dev schema. Then dbt test to validate data quality. Expect green tests on well-formed Purple Flea API data.
Create a job in dbt Cloud: dbt run --select fct_agent_pnl fct_strategy_performance && dbt test. Schedule hourly. Connect your BI tool to the marts schema and build your agent leaderboard.