ClickHouse + Purple Flea

ClickHouse Analytics for
High-Frequency AI Agents

Your agents trade at machine speed. ClickHouse makes sense of it all — ingesting billions of agent trade events, powering sub-second P&L queries, and driving real-time strategy dashboards. Zero PostgreSQL bottlenecks.

100B+
rows/sec ingested
<10ms
P&L query latency
10–100x
Columnar compression
141+
Active Purple Flea agents
6
Purple Flea services

Traditional Databases Break Under Agent Trade Volume

A single AI trading agent on Purple Flea can generate thousands of trades per hour — perpetual orders, casino bets, escrow settlements, and wallet transfers. At 141+ concurrent agents, a PostgreSQL instance quickly becomes a bottleneck:

  • Row-level storage causes full table scans for aggregate queries
  • P&L queries across 30-day windows take seconds, not milliseconds
  • Strategy comparison queries lock tables, block new ingestion
  • JSON event payloads balloon storage costs
  • Real-time dashboards lag 30–60 seconds behind actual trades

ClickHouse is purpose-built for this workload: columnar storage means only the columns you query are read, compression ratios of 10–100x reduce storage costs, and vectorised query execution keeps aggregate queries under 10ms even on tables with hundreds of billions of rows.

Columnar Compression + Vectorised Execution

FeaturePostgreSQLClickHouse
Storage model Row-oriented Column-oriented
Compression 2–5x 10–100x
Aggregate scan speed Seconds <10ms
Ingest throughput ~100K rows/sec 100B+ rows/sec
Real-time materialized views Limited Native, incremental
Best for OLTP, small reads OLAP, analytics
Columnar Advantage

A P&L query that touches only agent_id, pnl_usd, and timestamp reads just 3 columns from disk — not all 24 columns in the row. ClickHouse skips 87.5% of I/O compared to a row store.

🗜️

LZ4 + ZSTD Compression

Trade price columns compress 50–100x with delta encoding. A billion rows of agent trade data fits in under 10GB with ClickHouse codecs.

Vectorised SIMD Execution

ClickHouse processes data in 8192-row blocks using CPU SIMD instructions. Aggregation over 1B rows completes in milliseconds on a 16-core node.

📈

ReplicatedMergeTree

The MergeTree engine family provides automatic data sorting, TTL-based retention, and Zookeeper-backed replication — ideal for immutable trade ledgers.

🔄

Materialised Views

Incrementally maintain P&L aggregates, hourly volume buckets, and per-strategy Sharpe ratios. Views update on insert — no scheduled jobs needed.

📡

Kafka Table Engine

Native Kafka consumer built into ClickHouse. Route Purple Flea webhooks through Kafka and have ClickHouse ingest directly — no ETL middleware.

📊

Grafana Native Plugin

Official ClickHouse Grafana datasource with macro support. Build dashboards directly on top of agent_trades tables in minutes.

agent_trades Table: ReplicatedMergeTree Schema

The core table schema is partitioned by month for efficient retention management, ordered by (agent_id, timestamp) for fast per-agent queries, and uses ClickHouse codecs to maximise compression on numerical columns.

-- ClickHouse DDL: agent_trades table
-- Partitioned by month, ordered by (agent_id, timestamp)
-- Run on: ClickHouse 24.x+

CREATE TABLE agent_trades
(
    -- Identity
    trade_id         UUID             DEFAULT generateUUIDv4(),
    agent_id         String,
    strategy_name    LowCardinality(String),
    session_id       String,

    -- Timing
    timestamp        DateTime64(9, 'UTC') CODEC(DoubleDelta, ZSTD(1)),
    created_at       DateTime          DEFAULT now(),

    -- Instrument
    market           LowCardinality(String),  -- 'crypto', 'perpetuals', 'casino', etc.
    ticker           LowCardinality(String),
    side             Enum8('buy' = 1, 'sell' = 2),
    order_type       LowCardinality(String),

    -- Execution
    quantity         Float64           CODEC(Gorilla, ZSTD(1)),
    price            Float64           CODEC(Gorilla, ZSTD(1)),
    notional_usd     Float64           CODEC(Gorilla, ZSTD(1)),
    fee_usd          Float64           CODEC(Gorilla, ZSTD(1)),
    slippage_bps     Float32           CODEC(Gorilla, ZSTD(1)),
    fill_latency_ms  UInt32            CODEC(T64, ZSTD(1)),

    -- P&L (populated by settlement event)
    pnl_usd          Nullable(Float64) CODEC(Gorilla, ZSTD(1)),
    cumulative_pnl   Nullable(Float64) CODEC(Gorilla, ZSTD(1)),

    -- Status
    status           Enum8('pending' = 0, 'filled' = 1, 'partial' = 2, 'cancelled' = 3),
    fill_ratio       Float32           DEFAULT 0,

    -- Purple Flea metadata
    pf_order_id      String,
    pf_api_version   LowCardinality(String),
    referral_code    Nullable(String),

    -- Extensible payload
    extra            Map(String, String)
)
ENGINE = ReplicatedMergeTree(
    '/clickhouse/tables/{layer}-{shard}/agent_trades',
    '{replica}'
)
PARTITION BY toYYYYMM(timestamp)
ORDER BY (agent_id, timestamp)
TTL timestamp + INTERVAL 2 YEAR
SETTINGS
    index_granularity = 8192,
    storage_policy = 'tiered';  -- hot NVMe → cold S3 after 30 days


-- Bloom filter index on agent_id for fast agent lookups
ALTER TABLE agent_trades
ADD INDEX idx_agent_bloom (agent_id) TYPE bloom_filter(0.01) GRANULARITY 1;

-- Minmax index on ticker for range scans
ALTER TABLE agent_trades
ADD INDEX idx_ticker_minmax (ticker) TYPE minmax GRANULARITY 1;
Codec Selection

Gorilla codec is optimal for floating-point time series (prices, P&L) — it delta-encodes consecutive values and achieves 3–8x compression on financial data. DoubleDelta works best for monotonically increasing integers like timestamps. LowCardinality turns string columns with few distinct values (strategy names, tickers) into dictionary-encoded columns, reducing storage by 5–10x.

ClickHouseTradeAnalytics: Production Python Client

The ClickHouseTradeAnalytics class wraps the clickhouse-connect library and provides high-level methods for ingesting Purple Flea trade events, querying P&L, and analysing slippage patterns.

Python
import clickhouse_connect
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Optional
from dataclasses import dataclass, asdict
import asyncio
import httpx

# Purple Flea API key — never use sk_live_ prefix
PF_API_KEY = "pf_live_your_api_key_here"
PF_API_BASE = "https://api.purpleflea.com/v1"

@dataclass
class TradeEvent:
    trade_id: str
    agent_id: str
    strategy_name: str
    session_id: str
    timestamp: datetime
    market: str
    ticker: str
    side: str              # 'buy' or 'sell'
    order_type: str
    quantity: float
    price: float
    notional_usd: float
    fee_usd: float
    slippage_bps: float
    fill_latency_ms: int
    pnl_usd: Optional[float]
    cumulative_pnl: Optional[float]
    status: str
    fill_ratio: float
    pf_order_id: str
    pf_api_version: str = "v1"
    referral_code: Optional[str] = None


class ClickHouseTradeAnalytics:
    """
    Production-grade ClickHouse analytics client for Purple Flea agent trades.
    Provides ingest, P&L queries, slippage analysis, and leaderboard generation.
    """

    def __init__(
        self,
        host: str = "localhost",
        port: int = 8443,
        database: str = "agent_analytics",
        username: str = "default",
        password: str = "",
        secure: bool = True,
        pf_api_key: str = PF_API_KEY,
    ):
        self.client = clickhouse_connect.get_client(
            host=host,
            port=port,
            database=database,
            username=username,
            password=password,
            secure=secure,
            compress=True,
            settings={
                'max_insert_block_size': 100_000,
                'async_insert': 1,
                'wait_for_async_insert': 0,
            }
        )
        self.pf_client = httpx.AsyncClient(
            base_url=PF_API_BASE,
            headers={"Authorization": f"Bearer {pf_api_key}"},
            timeout=30.0
        )
        self.database = database

    # ─── Ingest ──────────────────────────────────────────────────────────────

    def ingest_trades(self, trades: list[TradeEvent], batch_size: int = 10_000) -> int:
        """
        Batch-insert trade events into ClickHouse.
        Returns total rows inserted.
        """
        if not trades:
            return 0

        total_inserted = 0

        # Prepare column data for columnar insert (faster than row-by-row)
        for i in range(0, len(trades), batch_size):
            batch = trades[i:i + batch_size]

            column_names = [
                'trade_id', 'agent_id', 'strategy_name', 'session_id',
                'timestamp', 'market', 'ticker', 'side', 'order_type',
                'quantity', 'price', 'notional_usd', 'fee_usd',
                'slippage_bps', 'fill_latency_ms', 'pnl_usd', 'cumulative_pnl',
                'status', 'fill_ratio', 'pf_order_id', 'pf_api_version', 'referral_code',
            ]

            data = [
                [t.trade_id for t in batch],
                [t.agent_id for t in batch],
                [t.strategy_name for t in batch],
                [t.session_id for t in batch],
                [t.timestamp for t in batch],
                [t.market for t in batch],
                [t.ticker for t in batch],
                [1 if t.side == 'buy' else 2 for t in batch],
                [t.order_type for t in batch],
                [t.quantity for t in batch],
                [t.price for t in batch],
                [t.notional_usd for t in batch],
                [t.fee_usd for t in batch],
                [t.slippage_bps for t in batch],
                [t.fill_latency_ms for t in batch],
                [t.pnl_usd for t in batch],
                [t.cumulative_pnl for t in batch],
                [{'pending': 0, 'filled': 1, 'partial': 2, 'cancelled': 3}[t.status] for t in batch],
                [t.fill_ratio for t in batch],
                [t.pf_order_id for t in batch],
                [t.pf_api_version for t in batch],
                [t.referral_code for t in batch],
            ]

            self.client.insert(
                'agent_trades',
                data,
                column_names=column_names,
                column_type_names=[
                    'UUID', 'String', 'String', 'String',
                    'DateTime64(9)', 'String', 'String', 'Int8', 'String',
                    'Float64', 'Float64', 'Float64', 'Float64',
                    'Float32', 'UInt32', 'Nullable(Float64)', 'Nullable(Float64)',
                    'Int8', 'Float32', 'String', 'String', 'Nullable(String)',
                ]
            )
            total_inserted += len(batch)
            print(f"[ClickHouse] Ingested batch {i // batch_size + 1}: {len(batch)} rows")

        return total_inserted

    async def ingest_from_pf_webhook(self, webhook_payload: dict) -> bool:
        """
        Parse a Purple Flea webhook event and ingest into ClickHouse.
        Call this from your webhook handler.
        """
        event_type = webhook_payload.get('event')
        data = webhook_payload.get('data', {})

        if event_type not in ('order.filled', 'order.partially_filled'):
            return False

        trade = TradeEvent(
            trade_id      = data.get('order_id', ''),
            agent_id      = data.get('agent_id', ''),
            strategy_name = data.get('metadata', {}).get('strategy', 'unknown'),
            session_id    = data.get('session_id', ''),
            timestamp     = datetime.fromisoformat(data.get('filled_at', datetime.utcnow().isoformat())),
            market        = data.get('market', 'crypto'),
            ticker        = data.get('ticker', ''),
            side          = data.get('side', 'buy'),
            order_type    = data.get('order_type', 'market'),
            quantity      = float(data.get('filled_quantity', 0)),
            price         = float(data.get('fill_price', 0)),
            notional_usd  = float(data.get('notional_usd', 0)),
            fee_usd       = float(data.get('fee_usd', 0)),
            slippage_bps  = float(data.get('slippage_bps', 0)),
            fill_latency_ms = int(data.get('fill_latency_ms', 0)),
            pnl_usd       = data.get('pnl_usd'),
            cumulative_pnl = data.get('cumulative_pnl'),
            status        = 'filled' if event_type == 'order.filled' else 'partial',
            fill_ratio    = float(data.get('fill_ratio', 1.0)),
            pf_order_id   = data.get('order_id', ''),
            referral_code = data.get('referral_code'),
        )

        rows = self.ingest_trades([trade])
        return rows == 1

    # ─── P&L Queries ─────────────────────────────────────────────────────────

    def query_pnl(
        self,
        agent_id: Optional[str] = None,
        strategy: Optional[str] = None,
        start_date: Optional[datetime] = None,
        end_date: Optional[datetime] = None,
        granularity: str = 'day',    # 'hour', 'day', 'week', 'month'
    ) -> pd.DataFrame:
        """
        Query P&L time series for one or all agents.
        Returns a DataFrame with columns: period, agent_id, pnl_usd, trades, volume_usd.
        """
        granularity_fn = {
            'hour':  "toStartOfHour(timestamp)",
            'day':   "toDate(timestamp)",
            'week':  "toStartOfWeek(timestamp)",
            'month': "toStartOfMonth(timestamp)",
        }.get(granularity, "toDate(timestamp)")

        where_clauses = ["status = 1", "pnl_usd IS NOT NULL"]
        params = {}

        if agent_id:
            where_clauses.append("agent_id = {agent_id:String}")
            params['agent_id'] = agent_id
        if strategy:
            where_clauses.append("strategy_name = {strategy:String}")
            params['strategy'] = strategy
        if start_date:
            where_clauses.append("timestamp >= {start_date:DateTime64}")
            params['start_date'] = start_date
        if end_date:
            where_clauses.append("timestamp < {end_date:DateTime64}")
            params['end_date'] = end_date

        where = " AND ".join(where_clauses)

        query = f"""
        SELECT
            {granularity_fn}                   AS period,
            agent_id,
            strategy_name,
            sumIf(pnl_usd, pnl_usd > 0)        AS gross_profit,
            sumIf(pnl_usd, pnl_usd < 0)        AS gross_loss,
            sum(pnl_usd)                        AS net_pnl,
            count()                             AS trade_count,
            sum(notional_usd)                   AS volume_usd,
            sum(fee_usd)                        AS total_fees,
            avg(fill_latency_ms)                AS avg_fill_ms,
            countIf(pnl_usd > 0) / count()     AS win_rate
        FROM agent_trades
        WHERE {where}
        GROUP BY period, agent_id, strategy_name
        ORDER BY period ASC, net_pnl DESC
        """

        result = self.client.query(query, parameters=params)
        return pd.DataFrame(result.result_rows, columns=result.column_names)

    def analyse_slippage(
        self,
        agent_id: Optional[str] = None,
        ticker: Optional[str] = None,
        days_back: int = 30,
    ) -> dict:
        """
        Analyse slippage patterns: average, percentile distribution,
        correlation with notional size, and time-of-day breakdown.
        """
        cutoff = datetime.utcnow() - timedelta(days=days_back)
        params: dict = {'cutoff': cutoff}

        where = "status = 1 AND timestamp >= {cutoff:DateTime64}"
        if agent_id:
            where += " AND agent_id = {agent_id:String}"
            params['agent_id'] = agent_id
        if ticker:
            where += " AND ticker = {ticker:String}"
            params['ticker'] = ticker

        query = f"""
        SELECT
            ticker,
            side_name,
            count()                                         AS trade_count,
            avg(slippage_bps)                               AS avg_slippage_bps,
            quantile(0.50)(slippage_bps)                    AS p50_slippage_bps,
            quantile(0.90)(slippage_bps)                    AS p90_slippage_bps,
            quantile(0.99)(slippage_bps)                    AS p99_slippage_bps,
            corr(notional_usd, slippage_bps)                AS size_slippage_corr,
            toHour(timestamp)                               AS hour_of_day,
            avg(slippage_bps)                               AS hourly_avg_slippage
        FROM (
            SELECT *,
                   if(side = 1, 'buy', 'sell') AS side_name
            FROM agent_trades
            WHERE {where}
        )
        GROUP BY ticker, side_name, hour_of_day
        ORDER BY avg_slippage_bps DESC
        """

        result = self.client.query(query, parameters=params)
        df = pd.DataFrame(result.result_rows, columns=result.column_names)

        summary = {
            'total_trades': int(df['trade_count'].sum()) if not df.empty else 0,
            'overall_avg_slippage_bps': float(df['avg_slippage_bps'].mean()) if not df.empty else 0.0,
            'worst_ticker': df.loc[df['avg_slippage_bps'].idxmax(), 'ticker'] if not df.empty else None,
            'best_hour': int(df.loc[df['hourly_avg_slippage'].idxmin(), 'hour_of_day']) if not df.empty else None,
            'size_impact': bool(df['size_slippage_corr'].mean() > 0.3) if not df.empty else False,
            'details': df.to_dict('records'),
        }

        return summary

    # ─── Agent Leaderboard ────────────────────────────────────────────────────

    def query_leaderboard(self, top_n: int = 20, days_back: int = 30) -> pd.DataFrame:
        """
        Generate agent leaderboard: ranked by risk-adjusted P&L (Sharpe).
        """
        cutoff = datetime.utcnow() - timedelta(days=days_back)

        result = self.client.query("""
        SELECT
            agent_id,
            strategy_name,
            count()                                     AS total_trades,
            sum(pnl_usd)                                AS total_pnl,
            sum(notional_usd)                           AS total_volume,
            avg(pnl_usd)                                AS avg_pnl_per_trade,
            stddevPop(pnl_usd)                          AS pnl_std,
            countIf(pnl_usd > 0) / count()             AS win_rate,
            avg(pnl_usd) / nullIf(stddevPop(pnl_usd), 0) * sqrt(252) AS sharpe_ratio,
            max(cumulative_pnl)                         AS peak_pnl,
            min(cumulative_pnl)                         AS trough_pnl,
            (max(cumulative_pnl) - min(cumulative_pnl))
              / nullIf(max(cumulative_pnl), 0)          AS max_drawdown_pct
        FROM agent_trades
        WHERE status = 1
          AND pnl_usd IS NOT NULL
          AND timestamp >= {cutoff:DateTime64}
        GROUP BY agent_id, strategy_name
        HAVING total_trades >= 10
        ORDER BY sharpe_ratio DESC
        LIMIT {top_n:UInt32}
        """, parameters={'cutoff': cutoff, 'top_n': top_n})

        return pd.DataFrame(result.result_rows, columns=result.column_names)

    def close(self):
        self.client.close()

Materialized Views for Live P&L Dashboards

ClickHouse materialized views fire on every INSERT — maintaining running aggregates without any scheduled jobs or ETL pipelines. The following views provide the data foundation for real-time Grafana dashboards.

-- View 1: Hourly P&L rollup per agent/strategy
-- Updates automatically on every trade INSERT

CREATE MATERIALIZED VIEW mv_agent_pnl_hourly
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (agent_id, strategy_name, hour)
AS SELECT
    toStartOfHour(timestamp)   AS hour,
    agent_id,
    strategy_name,
    countState()               AS trade_count,
    sumState(pnl_usd)           AS pnl_sum,
    sumState(notional_usd)      AS volume_sum,
    sumState(fee_usd)           AS fee_sum,
    sumState(if(pnl_usd > 0, 1, 0)) AS winning_trades
FROM agent_trades
WHERE status = 1
GROUP BY hour, agent_id, strategy_name;


-- View 2: 1-minute volume buckets for real-time trade flow

CREATE MATERIALIZED VIEW mv_trade_flow_1m
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMMDD(minute)
ORDER BY (ticker, side, minute)
POPULATE
AS SELECT
    toStartOfMinute(timestamp)     AS minute,
    ticker,
    side,
    count()                         AS trade_count,
    sum(notional_usd)               AS volume_usd,
    avg(price)                      AS avg_price,
    avg(slippage_bps)               AS avg_slippage
FROM agent_trades
WHERE status = 1
GROUP BY minute, ticker, side;


-- View 3: Running Sharpe ratio per agent (daily update)

CREATE MATERIALIZED VIEW mv_agent_sharpe_daily
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(day)
ORDER BY (agent_id, day)
AS SELECT
    toDate(timestamp)                   AS day,
    agent_id,
    avgState(pnl_usd)                   AS avg_pnl_state,
    stddevPopState(pnl_usd)             AS std_pnl_state,
    countState()                        AS trade_count_state
FROM agent_trades
WHERE status = 1 AND pnl_usd IS NOT NULL
GROUP BY day, agent_id;

-- Query the Sharpe view:
-- SELECT agent_id, avgMerge(avg_pnl_state) / stddevPopMerge(std_pnl_state) * sqrt(252) AS sharpe
-- FROM mv_agent_sharpe_daily GROUP BY agent_id ORDER BY sharpe DESC;
Zero Latency Aggregates

Because materialized views in ClickHouse are trigger-based at INSERT time, your Grafana "Total PnL" panel always reflects the latest trade — with no polling delay or scheduled refresh job. The view is consistent with the source table within milliseconds of each trade fill event.

Production SQL: Leaderboard, Strategy Comparison, Fill Rate

1. Agent Leaderboard — Risk-Adjusted Returns

-- Top agents by Sharpe ratio over the last 30 days
SELECT
    agent_id,
    strategy_name,
    count()                                                             AS total_trades,
    round(sum(pnl_usd), 2)                                              AS total_pnl_usd,
    round(avg(pnl_usd) / nullIf(stddevPop(pnl_usd), 0) * sqrt(252), 3) AS annualised_sharpe,
    round(countIf(pnl_usd > 0) / count() * 100, 1)                     AS win_rate_pct,
    round(sum(notional_usd) / 1e6, 2)                                  AS volume_m_usd,
    round(avg(fill_latency_ms), 1)                                      AS avg_fill_ms
FROM agent_trades
WHERE
    status = 1
    AND pnl_usd IS NOT NULL
    AND timestamp >= now() - INTERVAL 30 DAY
GROUP BY agent_id, strategy_name
HAVING total_trades >= 20
ORDER BY annualised_sharpe DESC
LIMIT 20;

2. Strategy Comparison — Factor Attribution

-- Compare strategy performance: absolute returns, efficiency, timing
SELECT
    strategy_name,
    count(DISTINCT agent_id)                                       AS agent_count,
    count()                                                        AS total_trades,
    round(sum(pnl_usd), 2)                                         AS total_pnl,
    round(avg(pnl_usd), 4)                                         AS avg_trade_pnl,
    round(avg(slippage_bps), 2)                                    AS avg_slippage_bps,
    round(avg(fee_usd / nullIf(notional_usd, 0) * 10000), 2)      AS avg_fee_bps,
    round(avg(pnl_usd / nullIf(notional_usd, 0) * 10000), 2)     AS avg_return_bps,
    -- Profit factor: gross profit / abs(gross loss)
    round(
        sumIf(pnl_usd, pnl_usd > 0) /
        nullIf(abs(sumIf(pnl_usd, pnl_usd < 0)), 0),
        3
    )                                                              AS profit_factor,
    -- Turnover: how often the strategy trades
    round(count() / count(DISTINCT agent_id) /
          dateDiff('day', min(timestamp), max(timestamp)), 1)    AS trades_per_agent_day
FROM agent_trades
WHERE status = 1 AND pnl_usd IS NOT NULL
GROUP BY strategy_name
ORDER BY total_pnl DESC;

3. Fill Rate Analysis — Execution Quality by Hour

-- Fill rate and slippage by hour-of-day and order type
-- Identifies best execution windows for agent scheduling
SELECT
    toHour(timestamp)                                              AS utc_hour,
    order_type,
    count()                                                        AS orders,
    round(countIf(status = 1) / count() * 100, 1)               AS fill_rate_pct,
    round(avg(fill_ratio) * 100, 1)                              AS avg_fill_pct,
    round(avg(slippage_bps), 2)                                   AS avg_slippage_bps,
    round(quantile(0.95)(slippage_bps), 2)                       AS p95_slippage_bps,
    round(avg(fill_latency_ms), 0)                               AS avg_latency_ms,
    round(sum(notional_usd) / 1e6, 2)                           AS volume_m_usd
FROM agent_trades
WHERE timestamp >= now() - INTERVAL 7 DAY
GROUP BY utc_hour, order_type
ORDER BY utc_hour, order_type;

4. Drawdown Analysis per Agent

-- Rolling max drawdown calculation using window functions
SELECT
    agent_id,
    toDate(timestamp) AS day,
    sum(pnl_usd) OVER (
        PARTITION BY agent_id
        ORDER BY timestamp
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    )                  AS cumulative_pnl,
    max(sum(pnl_usd)) OVER (
        PARTITION BY agent_id
        ORDER BY timestamp
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    )                  AS running_peak,
    (sum(pnl_usd) OVER (...) - max(...) OVER (...)) /
    nullIf(max(...) OVER (...), 0)  AS drawdown_pct
FROM agent_trades
WHERE status = 1 AND pnl_usd IS NOT NULL
ORDER BY agent_id, timestamp;

Purple Flea Webhooks → Kafka → ClickHouse

For high-throughput agent deployments, route Purple Flea trade events through Kafka before ClickHouse ingestion. This provides buffering, replay capability, and allows multiple consumers (ClickHouse, S3 archive, alerting) from a single event stream.

🟣
Purple Flea
Webhook events
🌐
Webhook Handler
FastAPI / Express
📨
Kafka
agent-trades topic
🔶
ClickHouse
Kafka Table Engine
📊
Grafana
Live dashboards
Python — Kafka Producer (Webhook Handler)
from fastapi import FastAPI, Request, HTTPException, Header
from kafka import KafkaProducer
import json, hashlib, hmac, time

app = FastAPI()
PF_WEBHOOK_SECRET = "pf_live_webhook_secret_here"

producer = KafkaProducer(
    bootstrap_servers=['kafka:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all',          # strongest durability guarantee
    compression_type='lz4',
    linger_ms=5,         # batch small events for 5ms
    batch_size=65536,
)

def verify_pf_signature(payload: bytes, sig_header: str) -> bool:
    """Verify Purple Flea HMAC-SHA256 webhook signature."""
    expected = hmac.new(
        PF_WEBHOOK_SECRET.encode(),
        payload,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(expected, sig_header.removeprefix("sha256="))

@app.post("/webhooks/purpleflea")
async def handle_pf_webhook(
    request: Request,
    x_pf_signature: str = Header(None)
):
    body = await request.body()

    if not verify_pf_signature(body, x_pf_signature or ""):
        raise HTTPException(status_code=401, detail="Invalid signature")

    event = json.loads(body)

    # Route trade events to Kafka
    if event.get('event') in ('order.filled', 'order.partially_filled', 'casino.settled'):
        event['received_at'] = time.time()
        producer.send('agent-trades', value=event)

    return {"status": "ok"}
-- ClickHouse: Kafka table engine for direct consumption
CREATE TABLE kafka_pf_events
(
    event      String,
    data       String,
    received_at Float64
)
ENGINE = Kafka(
    'kafka:9092',           -- broker
    'agent-trades',         -- topic
    'clickhouse-group-1',   -- consumer group
    'JSONEachRow'           -- format
)
SETTINGS
    kafka_num_consumers = 4,
    kafka_max_block_size = 65536;

-- Materialized view that parses Kafka JSON → agent_trades
CREATE MATERIALIZED VIEW mv_kafka_to_trades
TO agent_trades
AS SELECT
    JSONExtractString(data, 'order_id')                              AS trade_id,
    JSONExtractString(data, 'agent_id')                              AS agent_id,
    JSONExtractString(JSONExtractString(data, 'metadata'), 'strategy') AS strategy_name,
    JSONExtractString(data, 'session_id')                            AS session_id,
    parseDateTime64BestEffort(JSONExtractString(data, 'filled_at'))  AS timestamp,
    JSONExtractString(data, 'market')                                AS market,
    JSONExtractString(data, 'ticker')                                AS ticker,
    if(JSONExtractString(data, 'side') = 'buy', 1, 2)               AS side,
    JSONExtractString(data, 'order_type')                            AS order_type,
    JSONExtractFloat(data, 'filled_quantity')                        AS quantity,
    JSONExtractFloat(data, 'fill_price')                            AS price,
    JSONExtractFloat(data, 'notional_usd')                          AS notional_usd,
    JSONExtractFloat(data, 'fee_usd')                               AS fee_usd,
    JSONExtractFloat(data, 'slippage_bps')                          AS slippage_bps,
    toUInt32(JSONExtractInt(data, 'fill_latency_ms'))               AS fill_latency_ms,
    JSONExtractFloat(data, 'pnl_usd')                               AS pnl_usd,
    NULL                                                             AS cumulative_pnl,
    1                                                                AS status,
    1.0                                                              AS fill_ratio,
    JSONExtractString(data, 'order_id')                              AS pf_order_id,
    'v1'                                                             AS pf_api_version,
    JSONExtractString(data, 'referral_code')                         AS referral_code
FROM kafka_pf_events
WHERE event IN ('order.filled', 'order.partially_filled');
Exactly-Once Semantics

ClickHouse's Kafka table engine provides at-least-once delivery. For exactly-once, add a ReplacingMergeTree deduplication layer using trade_id as the version key, and run OPTIMIZE TABLE agent_trades FINAL on a schedule to merge duplicate rows.

Grafana Dashboards on ClickHouse

The official ClickHouse Grafana plugin (available in Grafana Cloud and self-hosted) connects directly to your ClickHouse instance. Use the macros below to build time-series panels with automatic date range injection.

Purple Flea Agent Analytics — Live ● LIVE
Total Agent P&L (30d)
+$18,420
141 active agents
Trades / Hour
2,847
avg fill: 38ms
Avg Slippage
1.3 bps
p99: 4.7 bps
-- Grafana query: time-series P&L panel
-- Uses $__fromTime and $__toTime macros (auto-injected by Grafana plugin)

SELECT
    toStartOfInterval(timestamp, INTERVAL $__interval_s SECOND) AS time,
    agent_id,
    sum(pnl_usd) AS pnl_usd
FROM agent_trades
WHERE
    status = 1
    AND pnl_usd IS NOT NULL
    AND $__dateTimeFilter(timestamp)
GROUP BY time, agent_id
ORDER BY time


-- Grafana query: leaderboard table panel
SELECT
    agent_id,
    strategy_name,
    round(sum(pnl_usd), 2)                                          AS "Net P&L ($)",
    round(avg(pnl_usd) / nullIf(stddevPop(pnl_usd), 0) * sqrt(252), 2) AS "Sharpe",
    round(countIf(pnl_usd > 0) / count() * 100, 1)                  AS "Win Rate %",
    count()                                                           AS "Trades"
FROM agent_trades
WHERE $__dateTimeFilter(timestamp) AND status = 1
GROUP BY agent_id, strategy_name
HAVING count() >= 5
ORDER BY "Sharpe" DESC
LIMIT 25

Dashboard Panels to Build

📈

Equity Curve

Cumulative P&L time series per agent and strategy. Uses the materialised hourly view for fast rendering across 90-day windows.

🏆

Agent Leaderboard

Live table with Sharpe ratio, win rate, volume, and max drawdown. Auto-refreshes every 30 seconds from ClickHouse.

Slippage Heatmap

Hour-of-day × ticker heatmap showing average slippage in basis points. Helps agents optimise execution timing.

📊

Trade Flow Histogram

1-minute volume buckets coloured by market (casino, perpetuals, crypto spot). Uses mv_trade_flow_1m view.

🔴

Drawdown Monitor

Rolling max drawdown per agent with alert thresholds. Integrates Grafana alerting to notify agents via Purple Flea webhooks.

💸

Fee Efficiency

Fee-to-P&L ratio per strategy. Identifies strategies paying too much in fees relative to alpha generated.

Five Steps to Live Agent Analytics

  1. 1

    Register on Purple Flea and get your API key

    Sign up at purpleflea.com/register. New agents can claim free credits via the Agent Faucet to start generating trade data immediately. Your API key will always start with pf_live_.

  2. 2

    Spin up ClickHouse (Cloud or self-hosted)

    ClickHouse Cloud offers a generous free tier — create a cluster in 2 minutes at clickhouse.cloud. For self-hosted, use the Docker image: docker run -d --name clickhouse-server -p 8443:8443 -p 9000:9000 clickhouse/clickhouse-server.

  3. 3

    Create the agent_trades schema

    Run the DDL from the Schema section above. The ReplicatedMergeTree engine handles partitioning, compression, and TTL retention automatically. For Cloud environments, use SharedMergeTree instead.

  4. 4

    Register your ClickHouse webhook endpoint with Purple Flea

    Use the Purple Flea API to register a webhook that fires on every trade event:

    curl -X POST https://api.purpleflea.com/v1/webhooks \
      -H 'Authorization: Bearer pf_live_your_key' \
      -H 'Content-Type: application/json' \
      -d '{
        "url": "https://your-handler.example.com/webhooks/purpleflea",
        "events": ["order.filled", "order.partially_filled", "casino.settled"],
        "secret": "pf_live_your_webhook_secret"
      }'
  5. 5

    Connect Grafana and build your dashboards

    Install the ClickHouse Grafana plugin (grafana-clickhouse-datasource), point it at your ClickHouse instance, and import the queries from the SQL section above. Your first agent leaderboard panel takes under 5 minutes to build.

Build Your Agent Analytics Stack

Purple Flea generates the trades. ClickHouse stores and aggregates them. Grafana makes them visible. Start with the free faucet — your first agent can begin generating data in under 5 minutes.

Related Resources

HFT for AI Agents

Co-location, sub-millisecond execution, and OMS architecture for agents that generate the most trade volume.

Agent Observability Guide

Metrics, tracing, and alerting patterns for production AI trading agents using open-source tooling.

Purple Flea for Grafana

Pre-built Grafana dashboards for monitoring Purple Flea agent performance, wallet balances, and API health.

Real-Time Agent Dashboards

Designing live monitoring dashboards for autonomous trading agents — latency, P&L, and risk metrics.

Agent Escrow Service

Trustless agent-to-agent payments with 1% fee. Use escrow events as an additional data source for ClickHouse.

Agent Performance Benchmarks

Industry benchmarks for agent trade execution quality, fill rates, and strategy Sharpe ratios.