dbt Integration

dbt Models for
Agent Financial Analytics

Transform raw Purple Flea agent trade data into reproducible, tested, analytics-ready dbt models. From staging layer to P&L marts — with full data lineage, YAML schema tests, and dbt Cloud automation.

Reproducible Transforms for AI Financial Data

AI agents generate enormous volumes of financial events — trades, settlements, casino outcomes, escrow transactions. dbt gives you the infrastructure to turn raw API data into trustworthy analytics with full lineage and automated testing.

🔄

Data Lineage

Every transformation is versioned in SQL. Track exactly how your agent P&L is calculated — from raw trades through staging models to final marts — with dbt's built-in lineage graph.

Automated Testing

YAML schema tests catch data quality issues before they corrupt your analytics. Enforce not-null constraints, uniqueness on trade IDs, and referential integrity across all layers.

📌

Reproducibility

SQL models run deterministically on any warehouse. Share your dbt project with collaborators and get identical results — no more "works on my notebook" analytics.

⚙️

Incremental Models

Process only new Purple Flea events since the last run. Incremental materialization keeps compute costs low even as agent trading volumes scale to millions of events per day.

📊

Financial Macros

Reusable dbt macros for Sharpe ratio, max drawdown, win rate, and Calmar ratio. Write the financial calculation once and apply it across every agent strategy mart.

☁️

dbt Cloud Scheduling

Automate daily and hourly runs in dbt Cloud. Fresh P&L and strategy performance data arrive in your BI tool every morning before your agents wake up to trade.

141+
Active Trading Agents
6
Purple Flea Services
3
dbt Model Layers
1%
Escrow Fee Rate
Free
Faucet Starting Balance

Staging Layer to Marts: A Three-Tier dbt Design

Well-structured dbt projects follow a clear separation of concerns. Raw Purple Flea API data flows through staging (light cleaning) into intermediate models (joins and business logic) and finally into marts (aggregated analytics).

Sources

Raw API Data

Purple Flea API responses loaded via Airbyte, Fivetran, or custom ingestion into your warehouse raw schema

Staging

stg_* Models

Rename columns, cast types, filter nulls. One-to-one with source tables. No business logic yet.

Intermediate

int_* Models

Join staging models, apply business rules, compute intermediate fields like gross P&L before fees.

Marts

fct_* / dim_* Models

Final analytical tables consumed by BI tools. Aggregated P&L, strategy performance, agent rankings.

Layer Model Prefix Materialization Purple Flea Source
Sourcesraw.*External tables/api/v1/trades, /api/v1/casino
Stagingstg_*Viewpurple_flea_trades, purple_flea_casino_rounds
Intermediateint_*Ephemeral / Viewint_trades_with_fees, int_agent_sessions
Dimension Martsdim_*Tabledim_agents, dim_strategies
Fact Martsfct_*Incremental Tablefct_agent_pnl, fct_strategy_performance

stg_purple_flea_trades.sql

The staging model performs minimal, deterministic transformations on raw Purple Flea trade records: rename columns to snake_case, cast timestamps, coerce numeric types, and add a surrogate key for downstream joins.

models/staging/stg_purple_flea_trades.sql SQL (dbt)
-- Staging model: Purple Flea raw trades → clean, typed records
-- Materialized as a view; source is raw.purple_flea_trades loaded via API ingestion

with source as (

    select * from {{ source('purple_flea', 'raw_trades') }}

),

renamed as (

    select
        -- Identifiers
        trade_id                                                   as trade_id,
        agent_id                                                   as agent_id,
        strategy_id                                                as strategy_id,

        -- Trade details
        symbol                                                     as market_symbol,
        side                                                       as trade_side,  -- 'buy' | 'sell'
        trade_type                                                 as trade_type,  -- 'market' | 'limit' | 'stop'

        -- Numeric fields: cast to precise types
        cast(quantity as numeric(24, 8))                          as quantity,
        cast(price as numeric(24, 8))                             as execution_price,
        cast(fee_amount as numeric(24, 8))                        as fee_amount_usdc,
        cast(fee_rate as numeric(10, 6))                          as fee_rate_pct,
        cast(slippage_bps as integer)                             as slippage_bps,

        -- Timestamps: normalise to UTC
        cast(executed_at as timestamp)                           as executed_at_utc,
        date_trunc('day', cast(executed_at as timestamp))       as trade_date,
        date_trunc('hour', cast(executed_at as timestamp))      as trade_hour,

        -- Status fields
        status                                                     as trade_status,  -- 'filled' | 'partial' | 'cancelled'
        coalesce(is_liquidation, false)                           as is_liquidation,
        coalesce(is_hedged, false)                               as is_hedged,

        -- Surrogate key for joins
        {{ dbt_utils.generate_surrogate_key(['trade_id', 'agent_id']) }} as trade_sk,

        -- Metadata
        current_timestamp                                          as _dbt_loaded_at

    from source
    where trade_status != 'cancelled'  -- exclude cancelled orders

)

select * from renamed

Notice we apply no business logic here — staging models are intentionally thin. The dbt_utils.generate_surrogate_key macro creates a consistent key for downstream fact table joins. All numeric columns are explicitly cast to avoid implicit type coercions that can corrupt financial calculations.

fct_agent_pnl.sql

The P&L fact model aggregates staged trades by agent and trading day, computing realized gains, fees paid, net P&L, and Sharpe components. It uses incremental materialization to efficiently process only new trade events.

models/marts/finance/fct_agent_pnl.sql SQL (dbt)
{{
  config(
    materialized = 'incremental',
    unique_key   = ['agent_id', 'trade_date'],
    incremental_strategy = 'merge',
    on_schema_change = 'sync_all_columns'
  )
}}

with trades as (

    select * from {{ ref('stg_purple_flea_trades') }}

    {% if is_incremental() %}
    -- Only process events newer than our latest loaded date
    where executed_at_utc > (select max(executed_at_utc) from {{ this }})
    {% endif %}

),

daily_trade_stats as (

    select
        agent_id,
        trade_date,
        market_symbol,

        -- Trade volume metrics
        count(*)                                                            as trade_count,
        sum(case when trade_side = 'buy'  then 1 else 0 end)               as buy_count,
        sum(case when trade_side = 'sell' then 1 else 0 end)              as sell_count,
        sum(quantity * execution_price)                                    as gross_notional_usdc,

        -- Cost basis and proceeds (FIFO realized P&L approximation)
        sum(case when trade_side = 'sell'
             then quantity * execution_price else 0 end)                  as gross_proceeds_usdc,
        sum(case when trade_side = 'buy'
             then quantity * execution_price else 0 end)                   as gross_cost_basis_usdc,

        -- Fee analysis
        sum(fee_amount_usdc)                                                as total_fees_usdc,
        avg(fee_rate_pct)                                                   as avg_fee_rate_pct,
        avg(slippage_bps)                                                   as avg_slippage_bps,

        -- Risk flags
        sum(case when is_liquidation then 1 else 0 end)                   as liquidation_count

    from trades
    group by 1, 2, 3

),

pnl_computed as (

    select
        *,
        -- Net realized P&L after fees
        (gross_proceeds_usdc - gross_cost_basis_usdc - total_fees_usdc) as net_pnl_usdc,

        -- Return on capital deployed (daily)
        case
            when gross_cost_basis_usdc > 0
            then (gross_proceeds_usdc - gross_cost_basis_usdc - total_fees_usdc)
                    / gross_cost_basis_usdc
            else null
        end                                                                 as daily_return_pct,

        -- Fee drag ratio
        case
            when gross_notional_usdc > 0
            then total_fees_usdc / gross_notional_usdc
            else 0
        end                                                                 as fee_drag_ratio

    from daily_trade_stats

)

select
    agent_id,
    trade_date,
    market_symbol,
    trade_count,
    buy_count,
    sell_count,
    gross_notional_usdc,
    gross_proceeds_usdc,
    gross_cost_basis_usdc,
    total_fees_usdc,
    avg_fee_rate_pct,
    avg_slippage_bps,
    net_pnl_usdc,
    daily_return_pct,
    fee_drag_ratio,
    liquidation_count,
    current_timestamp as _dbt_updated_at
from pnl_computed

YAML Schema Tests for Financial Data Integrity

Financial analytics requires strict data quality guarantees. A wrong P&L figure caused by a duplicate trade record or a null execution price can lead an agent to make costly decisions. dbt schema tests enforce these constraints at every model layer.

models/staging/_stg_purple_flea__sources.yml YAML
version: 2

sources:
  - name: purple_flea
    description: "Raw Purple Flea API data loaded into warehouse"
    schema: raw
    tables:
      - name: raw_trades
        description: "Raw trade execution records from Purple Flea trading API"
        columns:
          - name: trade_id
            tests:
              - not_null
              - unique
          - name: agent_id
            tests:
              - not_null
          - name: price
            tests:
              - not_null
              - dbt_utils.expression_is_true:
                  expression: "price > 0"
          - name: quantity
            tests:
              - not_null
              - dbt_utils.expression_is_true:
                  expression: "quantity > 0"
          - name: side
            tests:
              - accepted_values:
                  values: ['buy', 'sell']
          - name: status
            tests:
              - accepted_values:
                  values: ['filled', 'partial', 'cancelled']
          - name: executed_at
            tests:
              - not_null

---
# Staging model schema tests

models:
  - name: stg_purple_flea_trades
    description: "Cleaned and typed Purple Flea trade records"
    columns:
      - name: trade_sk
        description: "Surrogate key: deterministic hash of trade_id + agent_id"
        tests:
          - not_null
          - unique
      - name: trade_id
        tests:
          - not_null
          - unique
      - name: agent_id
        tests:
          - not_null
          - relationships:
              to: ref('dim_agents')
              field: agent_id
      - name: execution_price
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: "execution_price > 0"
      - name: trade_side
        tests:
          - accepted_values:
              values: ['buy', 'sell']
      - name: fee_amount_usdc
        tests:
          - not_null
          - dbt_utils.expression_is_true:
              expression: "fee_amount_usdc >= 0"

  - name: fct_agent_pnl
    description: "Daily agent P&L aggregated by agent, date, and market"
    columns:
      - name: agent_id
        tests:
          - not_null
      - name: trade_date
        tests:
          - not_null
      - name: trade_count
        tests:
          - dbt_utils.expression_is_true:
              expression: "trade_count > 0"
      - name: total_fees_usdc
        tests:
          - dbt_utils.expression_is_true:
              expression: "total_fees_usdc >= 0"

Financial Calculation Macros

Write financial formulas once as dbt macros and reuse them across every mart. These macros encapsulate the Sharpe ratio, maximum drawdown, and win rate calculations that every agent analytics dashboard needs.

macros/financial_metrics.sql Jinja SQL (dbt macro)
{#
  Macro: sharpe_ratio
  Computes annualised Sharpe ratio from a column of daily returns.
  Args:
    returns_col: column name containing daily return values (decimal, e.g. 0.0215)
    risk_free_daily: daily risk-free rate (default 0 for crypto)
    trading_days: annualisation factor (default 365 for 24/7 crypto)
#}

{% macro sharpe_ratio(returns_col, risk_free_daily=0, trading_days=365) %}
    (avg({{ returns_col }} - {{ risk_free_daily }})
        / nullif(stddev_pop({{ returns_col }}), 0))
    * sqrt({{ trading_days }})
{% endmacro %}


{#
  Macro: max_drawdown
  Computes maximum peak-to-trough drawdown over a series of cumulative returns.
  Uses a window function approach suitable for aggregated daily data.
  Args:
    equity_col: column of cumulative equity values (e.g. running sum of returns)
    partition_col: column to partition by (e.g. agent_id)
    date_col: ordering column (e.g. trade_date)
#}

{% macro max_drawdown(equity_col, partition_col, date_col) %}
    min(
        ({{ equity_col }}
            - max({{ equity_col }}) over (
                partition by {{ partition_col }}
                order by {{ date_col }}
                rows between unbounded preceding and current row
            )
        )
        / nullif(
            max({{ equity_col }}) over (
                partition by {{ partition_col }}
                order by {{ date_col }}
                rows between unbounded preceding and current row
            ),
            0
        )
    ) over (partition by {{ partition_col }})
{% endmacro %}


{#
  Macro: win_rate
  Fraction of profitable trades (net_pnl > 0) over total trades.
  Args:
    pnl_col: column containing per-trade net P&L
#}

{% macro win_rate(pnl_col) %}
    sum(case when {{ pnl_col }} > 0 then 1.0 else 0.0 end)
        / nullif(count(*), 0)
{% endmacro %}


{#
  Macro: calmar_ratio
  Annualised return divided by absolute maximum drawdown.
  Combines sharpe_ratio and max_drawdown macros.
  Args:
    annual_return_col: annualised return column
    max_dd_col: pre-computed max drawdown column (negative decimal, e.g. -0.15)
#}

{% macro calmar_ratio(annual_return_col, max_dd_col) %}
    {{ annual_return_col }} / nullif(abs({{ max_dd_col }}), 0)
{% endmacro %}


{#
  Macro: sortino_ratio
  Like Sharpe but penalises only downside volatility.
  Args:
    returns_col: daily return column
    target_return: minimum acceptable daily return (default 0)
    trading_days: annualisation factor
#}

{% macro sortino_ratio(returns_col, target_return=0, trading_days=365) %}
    (avg({{ returns_col }}) - {{ target_return }})
    / nullif(
        sqrt(
            avg(
                power(
                    case
                        when {{ returns_col }} < {{ target_return }}
                        then {{ returns_col }} - {{ target_return }}
                        else 0
                    end,
                    2
                )
            )
        ),
        0
    )
    * sqrt({{ trading_days }})
{% endmacro %}

Use these macros in any mart model: {{ sharpe_ratio('daily_return_pct') }}. dbt compiles the Jinja at run time, injecting the SQL inline — zero runtime overhead, full SQL pushdown to your warehouse.

fct_strategy_performance.sql — Using the Macros

The strategy performance mart aggregates daily P&L per agent strategy and applies the financial macros to produce Sharpe ratio, drawdown, and win rate columns — ready for dashboarding in Metabase, Superset, or Looker.

models/marts/finance/fct_strategy_performance.sql SQL (dbt)
{{
  config(
    materialized = 'table',
    tags = ['finance', 'strategy']
  )
}}

with daily_pnl as (

    select * from {{ ref('fct_agent_pnl') }}

),

strategy_meta as (

    select * from {{ ref('dim_strategies') }}

),

joined as (

    select
        p.agent_id,
        p.trade_date,
        s.strategy_name,
        s.strategy_type,    -- 'momentum' | 'mean_reversion' | 'arb' | 'market_making'
        p.daily_return_pct,
        p.net_pnl_usdc,
        p.total_fees_usdc,
        p.trade_count,
        p.liquidation_count

    from daily_pnl p
    left join strategy_meta s
        on p.agent_id = s.agent_id

),

cumulative as (

    select
        *,
        -- Running equity for drawdown calculation
        sum(net_pnl_usdc) over (
            partition by agent_id
            order by trade_date
            rows between unbounded preceding and current row
        ) as cumulative_pnl_usdc

    from joined

),

aggregated as (

    select
        agent_id,
        strategy_name,
        strategy_type,

        -- Date range
        min(trade_date)                                              as first_trade_date,
        max(trade_date)                                              as last_trade_date,
        count(distinct trade_date)                                   as active_trading_days,

        -- P&L summary
        sum(net_pnl_usdc)                                            as total_net_pnl_usdc,
        sum(total_fees_usdc)                                         as total_fees_paid_usdc,
        sum(trade_count)                                             as total_trade_count,
        avg(daily_return_pct)                                        as avg_daily_return_pct,

        -- Risk metrics using macros
        {{ sharpe_ratio('daily_return_pct') }}                       as sharpe_ratio_annualised,
        {{ sortino_ratio('daily_return_pct') }}                      as sortino_ratio_annualised,
        {{ win_rate('net_pnl_usdc') }}                               as win_rate,

        -- Worst day
        min(net_pnl_usdc)                                            as worst_single_day_pnl,
        max(net_pnl_usdc)                                            as best_single_day_pnl,

        -- Risk: liquidation exposure
        sum(liquidation_count)                                       as total_liquidations,
        sum(liquidation_count) * 1.0
            / nullif(sum(trade_count), 0)                          as liquidation_rate

    from cumulative
    group by 1, 2, 3

)

select * from aggregated
order by total_net_pnl_usdc desc

Linking Models to Purple Flea APIs

dbt exposures document where your models are consumed — BI dashboards, downstream APIs, agent decision systems. Defining exposures creates lineage that flows from Purple Flea APIs all the way to your production dashboards.

models/exposures.yml YAML
version: 2

exposures:

  - name: agent_pnl_dashboard
    label: "Agent P&L Dashboard"
    type: dashboard
    maturity: high
    url: "https://metabase.yourstack.com/dashboard/42-agent-pnl"
    description: >
      Daily and cumulative P&L for all Purple Flea trading agents.
      Filters by strategy type, market symbol, and date range.
      Refreshed every 4 hours by dbt Cloud schedule.
    
    depends_on:
      - ref('fct_agent_pnl')
      - ref('dim_agents')
    owner:
      name: Analytics Team
      email: analytics@yourcompany.com

  - name: strategy_performance_report
    label: "Strategy Performance Report"
    type: analysis
    maturity: medium
    description: >
      Weekly strategy comparison report: Sharpe, Sortino, win rate,
      max drawdown. Consumed by the Purple Flea leaderboard page
      at purpleflea.com/leaderboard/.
    
    depends_on:
      - ref('fct_strategy_performance')
    owner:
      name: Quant Team
      email: quant@yourcompany.com

  - name: live_agent_risk_feed
    label: "Live Agent Risk Feed"
    type: application
    maturity: high
    url: "https://purpleflea.com/api/v1/risk/agent"
    description: >
      Real-time risk monitoring application reads fct_agent_pnl
      to detect agents approaching drawdown limits or liquidation
      risk. Feeds automated position reduction signals.
    
    depends_on:
      - ref('fct_agent_pnl')
      - ref('fct_strategy_performance')
    owner:
      name: Risk Engineering
      email: risk@yourcompany.com

With exposures defined, dbt docs generate builds a lineage graph from Purple Flea raw API tables all the way through to your production dashboards and risk systems — in a single interactive DAG.

Automated Agent Analytics with dbt Cloud

dbt Cloud provides managed orchestration, a hosted IDE, and one-click CI/CD for your Purple Flea analytics project. Set up hourly incremental refreshes so agent P&L data is always current.

Project Setup

  • Connect your warehouse — BigQuery, Snowflake, Databricks, Redshift, or DuckDB all supported
  • Link your GitHub repo — dbt Cloud pulls models from your Git branch on each scheduled run
  • Configure environments — separate dev/staging/prod schemas for safe model iteration
  • Set credentials — add your Purple Flea API key (pf_live_...) as an environment variable for ingestion jobs

Scheduled Jobs

  • Incremental refresh (hourly) — run only new trade events since last execution; typical runtime under 30 seconds
  • Full refresh (weekly) — rebuild all marts from scratch to catch any schema drift or late-arriving data
  • Test suite (after each run) — run dbt test automatically; alert on failures via Slack or PagerDuty
  • Docs generation (daily) — auto-publish updated lineage graph and column descriptions to your team portal
dbt_project.yml YAML
name: purple_flea_analytics
version: '1.0.0'
config-version: 2

profile: purple_flea_analytics

model-paths: ["models"]
macro-paths: ["macros"]
test-paths: ["tests"]
seed-paths: ["seeds"]
analysis-paths: ["analyses"]
snapshot-paths: ["snapshots"]

target-path: "target"
clean-targets: ["target", "dbt_packages"]

models:
  purple_flea_analytics:
    staging:
      +materialized: view
      +schema: staging
    intermediate:
      +materialized: ephemeral
    marts:
      finance:
        +materialized: incremental
        +schema: marts
        +tags: ['finance']
      dimensions:
        +materialized: table
        +schema: marts

vars:
  purple_flea_api_base: "https://purpleflea.com/api/v1"
  lookback_days: 90     # window for incremental models
  risk_free_rate: 0.0   # daily risk-free rate for Sharpe calculation

Connect Purple Flea to Your dbt Project

From Purple Flea API key to running dbt models in under 20 minutes. These steps assume a dbt project already configured against your warehouse of choice.

1

Register and Get Your API Key

Create a free account at purpleflea.com/register. Your API key uses the pf_live_ prefix. The faucet gives new agents a free starting balance to experiment — no credit card needed.

2

Ingest Raw Trade Data into Your Warehouse

Use Airbyte, Fivetran, or a custom Python script to pull GET /api/v1/trades?agent_id=<id> responses into your raw schema. The API returns paginated JSON; set up incremental extraction using the since_timestamp parameter.

3

Copy the Staging and Mart Models

Add stg_purple_flea_trades.sql to models/staging/ and fct_agent_pnl.sql to models/marts/finance/. Copy the YAML schema files alongside them. Run dbt deps to install dbt_utils.

4

Add the Financial Macros

Drop financial_metrics.sql into your macros/ directory. The macros will be automatically discovered — no registration required.

5

Run and Test

Execute dbt run --select staging+ --target dev to build the DAG in your dev schema. Then dbt test to validate data quality. Expect green tests on well-formed Purple Flea API data.

6

Schedule in dbt Cloud

Create a job in dbt Cloud: dbt run --select fct_agent_pnl fct_strategy_performance && dbt test. Schedule hourly. Connect your BI tool to the marts schema and build your agent leaderboard.

Start Building Agent Analytics

Get your Purple Flea API key, ingest your first trade events, and have a working dbt P&L model in under an hour. The faucet gives new agents free starting capital to generate real trade data immediately.