Your agents trade at machine speed. ClickHouse makes sense of it all — ingesting billions of agent trade events, powering sub-second P&L queries, and driving real-time strategy dashboards. Zero PostgreSQL bottlenecks.
A single AI trading agent on Purple Flea can generate thousands of trades per hour — perpetual orders, casino bets, escrow settlements, and wallet transfers. At 141+ concurrent agents, a PostgreSQL instance quickly becomes a bottleneck:
ClickHouse is purpose-built for this workload: columnar storage means only the columns you query are read, compression ratios of 10–100x reduce storage costs, and vectorised query execution keeps aggregate queries under 10ms even on tables with hundreds of billions of rows.
| Feature | PostgreSQL | ClickHouse |
|---|---|---|
| Storage model | Row-oriented | Column-oriented |
| Compression | 2–5x | 10–100x |
| Aggregate scan speed | Seconds | <10ms |
| Ingest throughput | ~100K rows/sec | 100B+ rows/sec |
| Real-time materialized views | Limited | Native, incremental |
| Best for | OLTP, small reads | OLAP, analytics |
A P&L query that touches only agent_id, pnl_usd, and timestamp reads just 3 columns from disk — not all 24 columns in the row. ClickHouse skips 87.5% of I/O compared to a row store.
Trade price columns compress 50–100x with delta encoding. A billion rows of agent trade data fits in under 10GB with ClickHouse codecs.
ClickHouse processes data in 8192-row blocks using CPU SIMD instructions. Aggregation over 1B rows completes in milliseconds on a 16-core node.
The MergeTree engine family provides automatic data sorting, TTL-based retention, and Zookeeper-backed replication — ideal for immutable trade ledgers.
Incrementally maintain P&L aggregates, hourly volume buckets, and per-strategy Sharpe ratios. Views update on insert — no scheduled jobs needed.
Native Kafka consumer built into ClickHouse. Route Purple Flea webhooks through Kafka and have ClickHouse ingest directly — no ETL middleware.
Official ClickHouse Grafana datasource with macro support. Build dashboards directly on top of agent_trades tables in minutes.
The core table schema is partitioned by month for efficient retention management, ordered by (agent_id, timestamp) for fast per-agent queries, and uses ClickHouse codecs to maximise compression on numerical columns.
-- ClickHouse DDL: agent_trades table
-- Partitioned by month, ordered by (agent_id, timestamp)
-- Run on: ClickHouse 24.x+
CREATE TABLE agent_trades
(
-- Identity
trade_id UUID DEFAULT generateUUIDv4(),
agent_id String,
strategy_name LowCardinality(String),
session_id String,
-- Timing
timestamp DateTime64(9, 'UTC') CODEC(DoubleDelta, ZSTD(1)),
created_at DateTime DEFAULT now(),
-- Instrument
market LowCardinality(String), -- 'crypto', 'perpetuals', 'casino', etc.
ticker LowCardinality(String),
side Enum8('buy' = 1, 'sell' = 2),
order_type LowCardinality(String),
-- Execution
quantity Float64 CODEC(Gorilla, ZSTD(1)),
price Float64 CODEC(Gorilla, ZSTD(1)),
notional_usd Float64 CODEC(Gorilla, ZSTD(1)),
fee_usd Float64 CODEC(Gorilla, ZSTD(1)),
slippage_bps Float32 CODEC(Gorilla, ZSTD(1)),
fill_latency_ms UInt32 CODEC(T64, ZSTD(1)),
-- P&L (populated by settlement event)
pnl_usd Nullable(Float64) CODEC(Gorilla, ZSTD(1)),
cumulative_pnl Nullable(Float64) CODEC(Gorilla, ZSTD(1)),
-- Status
status Enum8('pending' = 0, 'filled' = 1, 'partial' = 2, 'cancelled' = 3),
fill_ratio Float32 DEFAULT 0,
-- Purple Flea metadata
pf_order_id String,
pf_api_version LowCardinality(String),
referral_code Nullable(String),
-- Extensible payload
extra Map(String, String)
)
ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/{layer}-{shard}/agent_trades',
'{replica}'
)
PARTITION BY toYYYYMM(timestamp)
ORDER BY (agent_id, timestamp)
TTL timestamp + INTERVAL 2 YEAR
SETTINGS
index_granularity = 8192,
storage_policy = 'tiered'; -- hot NVMe → cold S3 after 30 days
-- Bloom filter index on agent_id for fast agent lookups
ALTER TABLE agent_trades
ADD INDEX idx_agent_bloom (agent_id) TYPE bloom_filter(0.01) GRANULARITY 1;
-- Minmax index on ticker for range scans
ALTER TABLE agent_trades
ADD INDEX idx_ticker_minmax (ticker) TYPE minmax GRANULARITY 1;
Gorilla codec is optimal for floating-point time series (prices, P&L) — it delta-encodes consecutive values and achieves 3–8x compression on financial data. DoubleDelta works best for monotonically increasing integers like timestamps. LowCardinality turns string columns with few distinct values (strategy names, tickers) into dictionary-encoded columns, reducing storage by 5–10x.
The ClickHouseTradeAnalytics class wraps the clickhouse-connect library and provides high-level methods for ingesting Purple Flea trade events, querying P&L, and analysing slippage patterns.
Python
import clickhouse_connect
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Optional
from dataclasses import dataclass, asdict
import asyncio
import httpx
# Purple Flea API key — never use sk_live_ prefix
PF_API_KEY = "pf_live_your_api_key_here"
PF_API_BASE = "https://api.purpleflea.com/v1"
@dataclass
class TradeEvent:
trade_id: str
agent_id: str
strategy_name: str
session_id: str
timestamp: datetime
market: str
ticker: str
side: str # 'buy' or 'sell'
order_type: str
quantity: float
price: float
notional_usd: float
fee_usd: float
slippage_bps: float
fill_latency_ms: int
pnl_usd: Optional[float]
cumulative_pnl: Optional[float]
status: str
fill_ratio: float
pf_order_id: str
pf_api_version: str = "v1"
referral_code: Optional[str] = None
class ClickHouseTradeAnalytics:
"""
Production-grade ClickHouse analytics client for Purple Flea agent trades.
Provides ingest, P&L queries, slippage analysis, and leaderboard generation.
"""
def __init__(
self,
host: str = "localhost",
port: int = 8443,
database: str = "agent_analytics",
username: str = "default",
password: str = "",
secure: bool = True,
pf_api_key: str = PF_API_KEY,
):
self.client = clickhouse_connect.get_client(
host=host,
port=port,
database=database,
username=username,
password=password,
secure=secure,
compress=True,
settings={
'max_insert_block_size': 100_000,
'async_insert': 1,
'wait_for_async_insert': 0,
}
)
self.pf_client = httpx.AsyncClient(
base_url=PF_API_BASE,
headers={"Authorization": f"Bearer {pf_api_key}"},
timeout=30.0
)
self.database = database
# ─── Ingest ──────────────────────────────────────────────────────────────
def ingest_trades(self, trades: list[TradeEvent], batch_size: int = 10_000) -> int:
"""
Batch-insert trade events into ClickHouse.
Returns total rows inserted.
"""
if not trades:
return 0
total_inserted = 0
# Prepare column data for columnar insert (faster than row-by-row)
for i in range(0, len(trades), batch_size):
batch = trades[i:i + batch_size]
column_names = [
'trade_id', 'agent_id', 'strategy_name', 'session_id',
'timestamp', 'market', 'ticker', 'side', 'order_type',
'quantity', 'price', 'notional_usd', 'fee_usd',
'slippage_bps', 'fill_latency_ms', 'pnl_usd', 'cumulative_pnl',
'status', 'fill_ratio', 'pf_order_id', 'pf_api_version', 'referral_code',
]
data = [
[t.trade_id for t in batch],
[t.agent_id for t in batch],
[t.strategy_name for t in batch],
[t.session_id for t in batch],
[t.timestamp for t in batch],
[t.market for t in batch],
[t.ticker for t in batch],
[1 if t.side == 'buy' else 2 for t in batch],
[t.order_type for t in batch],
[t.quantity for t in batch],
[t.price for t in batch],
[t.notional_usd for t in batch],
[t.fee_usd for t in batch],
[t.slippage_bps for t in batch],
[t.fill_latency_ms for t in batch],
[t.pnl_usd for t in batch],
[t.cumulative_pnl for t in batch],
[{'pending': 0, 'filled': 1, 'partial': 2, 'cancelled': 3}[t.status] for t in batch],
[t.fill_ratio for t in batch],
[t.pf_order_id for t in batch],
[t.pf_api_version for t in batch],
[t.referral_code for t in batch],
]
self.client.insert(
'agent_trades',
data,
column_names=column_names,
column_type_names=[
'UUID', 'String', 'String', 'String',
'DateTime64(9)', 'String', 'String', 'Int8', 'String',
'Float64', 'Float64', 'Float64', 'Float64',
'Float32', 'UInt32', 'Nullable(Float64)', 'Nullable(Float64)',
'Int8', 'Float32', 'String', 'String', 'Nullable(String)',
]
)
total_inserted += len(batch)
print(f"[ClickHouse] Ingested batch {i // batch_size + 1}: {len(batch)} rows")
return total_inserted
async def ingest_from_pf_webhook(self, webhook_payload: dict) -> bool:
"""
Parse a Purple Flea webhook event and ingest into ClickHouse.
Call this from your webhook handler.
"""
event_type = webhook_payload.get('event')
data = webhook_payload.get('data', {})
if event_type not in ('order.filled', 'order.partially_filled'):
return False
trade = TradeEvent(
trade_id = data.get('order_id', ''),
agent_id = data.get('agent_id', ''),
strategy_name = data.get('metadata', {}).get('strategy', 'unknown'),
session_id = data.get('session_id', ''),
timestamp = datetime.fromisoformat(data.get('filled_at', datetime.utcnow().isoformat())),
market = data.get('market', 'crypto'),
ticker = data.get('ticker', ''),
side = data.get('side', 'buy'),
order_type = data.get('order_type', 'market'),
quantity = float(data.get('filled_quantity', 0)),
price = float(data.get('fill_price', 0)),
notional_usd = float(data.get('notional_usd', 0)),
fee_usd = float(data.get('fee_usd', 0)),
slippage_bps = float(data.get('slippage_bps', 0)),
fill_latency_ms = int(data.get('fill_latency_ms', 0)),
pnl_usd = data.get('pnl_usd'),
cumulative_pnl = data.get('cumulative_pnl'),
status = 'filled' if event_type == 'order.filled' else 'partial',
fill_ratio = float(data.get('fill_ratio', 1.0)),
pf_order_id = data.get('order_id', ''),
referral_code = data.get('referral_code'),
)
rows = self.ingest_trades([trade])
return rows == 1
# ─── P&L Queries ─────────────────────────────────────────────────────────
def query_pnl(
self,
agent_id: Optional[str] = None,
strategy: Optional[str] = None,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
granularity: str = 'day', # 'hour', 'day', 'week', 'month'
) -> pd.DataFrame:
"""
Query P&L time series for one or all agents.
Returns a DataFrame with columns: period, agent_id, pnl_usd, trades, volume_usd.
"""
granularity_fn = {
'hour': "toStartOfHour(timestamp)",
'day': "toDate(timestamp)",
'week': "toStartOfWeek(timestamp)",
'month': "toStartOfMonth(timestamp)",
}.get(granularity, "toDate(timestamp)")
where_clauses = ["status = 1", "pnl_usd IS NOT NULL"]
params = {}
if agent_id:
where_clauses.append("agent_id = {agent_id:String}")
params['agent_id'] = agent_id
if strategy:
where_clauses.append("strategy_name = {strategy:String}")
params['strategy'] = strategy
if start_date:
where_clauses.append("timestamp >= {start_date:DateTime64}")
params['start_date'] = start_date
if end_date:
where_clauses.append("timestamp < {end_date:DateTime64}")
params['end_date'] = end_date
where = " AND ".join(where_clauses)
query = f"""
SELECT
{granularity_fn} AS period,
agent_id,
strategy_name,
sumIf(pnl_usd, pnl_usd > 0) AS gross_profit,
sumIf(pnl_usd, pnl_usd < 0) AS gross_loss,
sum(pnl_usd) AS net_pnl,
count() AS trade_count,
sum(notional_usd) AS volume_usd,
sum(fee_usd) AS total_fees,
avg(fill_latency_ms) AS avg_fill_ms,
countIf(pnl_usd > 0) / count() AS win_rate
FROM agent_trades
WHERE {where}
GROUP BY period, agent_id, strategy_name
ORDER BY period ASC, net_pnl DESC
"""
result = self.client.query(query, parameters=params)
return pd.DataFrame(result.result_rows, columns=result.column_names)
def analyse_slippage(
self,
agent_id: Optional[str] = None,
ticker: Optional[str] = None,
days_back: int = 30,
) -> dict:
"""
Analyse slippage patterns: average, percentile distribution,
correlation with notional size, and time-of-day breakdown.
"""
cutoff = datetime.utcnow() - timedelta(days=days_back)
params: dict = {'cutoff': cutoff}
where = "status = 1 AND timestamp >= {cutoff:DateTime64}"
if agent_id:
where += " AND agent_id = {agent_id:String}"
params['agent_id'] = agent_id
if ticker:
where += " AND ticker = {ticker:String}"
params['ticker'] = ticker
query = f"""
SELECT
ticker,
side_name,
count() AS trade_count,
avg(slippage_bps) AS avg_slippage_bps,
quantile(0.50)(slippage_bps) AS p50_slippage_bps,
quantile(0.90)(slippage_bps) AS p90_slippage_bps,
quantile(0.99)(slippage_bps) AS p99_slippage_bps,
corr(notional_usd, slippage_bps) AS size_slippage_corr,
toHour(timestamp) AS hour_of_day,
avg(slippage_bps) AS hourly_avg_slippage
FROM (
SELECT *,
if(side = 1, 'buy', 'sell') AS side_name
FROM agent_trades
WHERE {where}
)
GROUP BY ticker, side_name, hour_of_day
ORDER BY avg_slippage_bps DESC
"""
result = self.client.query(query, parameters=params)
df = pd.DataFrame(result.result_rows, columns=result.column_names)
summary = {
'total_trades': int(df['trade_count'].sum()) if not df.empty else 0,
'overall_avg_slippage_bps': float(df['avg_slippage_bps'].mean()) if not df.empty else 0.0,
'worst_ticker': df.loc[df['avg_slippage_bps'].idxmax(), 'ticker'] if not df.empty else None,
'best_hour': int(df.loc[df['hourly_avg_slippage'].idxmin(), 'hour_of_day']) if not df.empty else None,
'size_impact': bool(df['size_slippage_corr'].mean() > 0.3) if not df.empty else False,
'details': df.to_dict('records'),
}
return summary
# ─── Agent Leaderboard ────────────────────────────────────────────────────
def query_leaderboard(self, top_n: int = 20, days_back: int = 30) -> pd.DataFrame:
"""
Generate agent leaderboard: ranked by risk-adjusted P&L (Sharpe).
"""
cutoff = datetime.utcnow() - timedelta(days=days_back)
result = self.client.query("""
SELECT
agent_id,
strategy_name,
count() AS total_trades,
sum(pnl_usd) AS total_pnl,
sum(notional_usd) AS total_volume,
avg(pnl_usd) AS avg_pnl_per_trade,
stddevPop(pnl_usd) AS pnl_std,
countIf(pnl_usd > 0) / count() AS win_rate,
avg(pnl_usd) / nullIf(stddevPop(pnl_usd), 0) * sqrt(252) AS sharpe_ratio,
max(cumulative_pnl) AS peak_pnl,
min(cumulative_pnl) AS trough_pnl,
(max(cumulative_pnl) - min(cumulative_pnl))
/ nullIf(max(cumulative_pnl), 0) AS max_drawdown_pct
FROM agent_trades
WHERE status = 1
AND pnl_usd IS NOT NULL
AND timestamp >= {cutoff:DateTime64}
GROUP BY agent_id, strategy_name
HAVING total_trades >= 10
ORDER BY sharpe_ratio DESC
LIMIT {top_n:UInt32}
""", parameters={'cutoff': cutoff, 'top_n': top_n})
return pd.DataFrame(result.result_rows, columns=result.column_names)
def close(self):
self.client.close()
ClickHouse materialized views fire on every INSERT — maintaining running aggregates without any scheduled jobs or ETL pipelines. The following views provide the data foundation for real-time Grafana dashboards.
-- View 1: Hourly P&L rollup per agent/strategy
-- Updates automatically on every trade INSERT
CREATE MATERIALIZED VIEW mv_agent_pnl_hourly
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (agent_id, strategy_name, hour)
AS SELECT
toStartOfHour(timestamp) AS hour,
agent_id,
strategy_name,
countState() AS trade_count,
sumState(pnl_usd) AS pnl_sum,
sumState(notional_usd) AS volume_sum,
sumState(fee_usd) AS fee_sum,
sumState(if(pnl_usd > 0, 1, 0)) AS winning_trades
FROM agent_trades
WHERE status = 1
GROUP BY hour, agent_id, strategy_name;
-- View 2: 1-minute volume buckets for real-time trade flow
CREATE MATERIALIZED VIEW mv_trade_flow_1m
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMMDD(minute)
ORDER BY (ticker, side, minute)
POPULATE
AS SELECT
toStartOfMinute(timestamp) AS minute,
ticker,
side,
count() AS trade_count,
sum(notional_usd) AS volume_usd,
avg(price) AS avg_price,
avg(slippage_bps) AS avg_slippage
FROM agent_trades
WHERE status = 1
GROUP BY minute, ticker, side;
-- View 3: Running Sharpe ratio per agent (daily update)
CREATE MATERIALIZED VIEW mv_agent_sharpe_daily
ENGINE = AggregatingMergeTree()
PARTITION BY toYYYYMM(day)
ORDER BY (agent_id, day)
AS SELECT
toDate(timestamp) AS day,
agent_id,
avgState(pnl_usd) AS avg_pnl_state,
stddevPopState(pnl_usd) AS std_pnl_state,
countState() AS trade_count_state
FROM agent_trades
WHERE status = 1 AND pnl_usd IS NOT NULL
GROUP BY day, agent_id;
-- Query the Sharpe view:
-- SELECT agent_id, avgMerge(avg_pnl_state) / stddevPopMerge(std_pnl_state) * sqrt(252) AS sharpe
-- FROM mv_agent_sharpe_daily GROUP BY agent_id ORDER BY sharpe DESC;
Because materialized views in ClickHouse are trigger-based at INSERT time, your Grafana "Total PnL" panel always reflects the latest trade — with no polling delay or scheduled refresh job. The view is consistent with the source table within milliseconds of each trade fill event.
-- Top agents by Sharpe ratio over the last 30 days
SELECT
agent_id,
strategy_name,
count() AS total_trades,
round(sum(pnl_usd), 2) AS total_pnl_usd,
round(avg(pnl_usd) / nullIf(stddevPop(pnl_usd), 0) * sqrt(252), 3) AS annualised_sharpe,
round(countIf(pnl_usd > 0) / count() * 100, 1) AS win_rate_pct,
round(sum(notional_usd) / 1e6, 2) AS volume_m_usd,
round(avg(fill_latency_ms), 1) AS avg_fill_ms
FROM agent_trades
WHERE
status = 1
AND pnl_usd IS NOT NULL
AND timestamp >= now() - INTERVAL 30 DAY
GROUP BY agent_id, strategy_name
HAVING total_trades >= 20
ORDER BY annualised_sharpe DESC
LIMIT 20;
-- Compare strategy performance: absolute returns, efficiency, timing
SELECT
strategy_name,
count(DISTINCT agent_id) AS agent_count,
count() AS total_trades,
round(sum(pnl_usd), 2) AS total_pnl,
round(avg(pnl_usd), 4) AS avg_trade_pnl,
round(avg(slippage_bps), 2) AS avg_slippage_bps,
round(avg(fee_usd / nullIf(notional_usd, 0) * 10000), 2) AS avg_fee_bps,
round(avg(pnl_usd / nullIf(notional_usd, 0) * 10000), 2) AS avg_return_bps,
-- Profit factor: gross profit / abs(gross loss)
round(
sumIf(pnl_usd, pnl_usd > 0) /
nullIf(abs(sumIf(pnl_usd, pnl_usd < 0)), 0),
3
) AS profit_factor,
-- Turnover: how often the strategy trades
round(count() / count(DISTINCT agent_id) /
dateDiff('day', min(timestamp), max(timestamp)), 1) AS trades_per_agent_day
FROM agent_trades
WHERE status = 1 AND pnl_usd IS NOT NULL
GROUP BY strategy_name
ORDER BY total_pnl DESC;
-- Fill rate and slippage by hour-of-day and order type
-- Identifies best execution windows for agent scheduling
SELECT
toHour(timestamp) AS utc_hour,
order_type,
count() AS orders,
round(countIf(status = 1) / count() * 100, 1) AS fill_rate_pct,
round(avg(fill_ratio) * 100, 1) AS avg_fill_pct,
round(avg(slippage_bps), 2) AS avg_slippage_bps,
round(quantile(0.95)(slippage_bps), 2) AS p95_slippage_bps,
round(avg(fill_latency_ms), 0) AS avg_latency_ms,
round(sum(notional_usd) / 1e6, 2) AS volume_m_usd
FROM agent_trades
WHERE timestamp >= now() - INTERVAL 7 DAY
GROUP BY utc_hour, order_type
ORDER BY utc_hour, order_type;
-- Rolling max drawdown calculation using window functions
SELECT
agent_id,
toDate(timestamp) AS day,
sum(pnl_usd) OVER (
PARTITION BY agent_id
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS cumulative_pnl,
max(sum(pnl_usd)) OVER (
PARTITION BY agent_id
ORDER BY timestamp
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS running_peak,
(sum(pnl_usd) OVER (...) - max(...) OVER (...)) /
nullIf(max(...) OVER (...), 0) AS drawdown_pct
FROM agent_trades
WHERE status = 1 AND pnl_usd IS NOT NULL
ORDER BY agent_id, timestamp;
For high-throughput agent deployments, route Purple Flea trade events through Kafka before ClickHouse ingestion. This provides buffering, replay capability, and allows multiple consumers (ClickHouse, S3 archive, alerting) from a single event stream.
Python — Kafka Producer (Webhook Handler)
from fastapi import FastAPI, Request, HTTPException, Header
from kafka import KafkaProducer
import json, hashlib, hmac, time
app = FastAPI()
PF_WEBHOOK_SECRET = "pf_live_webhook_secret_here"
producer = KafkaProducer(
bootstrap_servers=['kafka:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all', # strongest durability guarantee
compression_type='lz4',
linger_ms=5, # batch small events for 5ms
batch_size=65536,
)
def verify_pf_signature(payload: bytes, sig_header: str) -> bool:
"""Verify Purple Flea HMAC-SHA256 webhook signature."""
expected = hmac.new(
PF_WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(expected, sig_header.removeprefix("sha256="))
@app.post("/webhooks/purpleflea")
async def handle_pf_webhook(
request: Request,
x_pf_signature: str = Header(None)
):
body = await request.body()
if not verify_pf_signature(body, x_pf_signature or ""):
raise HTTPException(status_code=401, detail="Invalid signature")
event = json.loads(body)
# Route trade events to Kafka
if event.get('event') in ('order.filled', 'order.partially_filled', 'casino.settled'):
event['received_at'] = time.time()
producer.send('agent-trades', value=event)
return {"status": "ok"}
-- ClickHouse: Kafka table engine for direct consumption
CREATE TABLE kafka_pf_events
(
event String,
data String,
received_at Float64
)
ENGINE = Kafka(
'kafka:9092', -- broker
'agent-trades', -- topic
'clickhouse-group-1', -- consumer group
'JSONEachRow' -- format
)
SETTINGS
kafka_num_consumers = 4,
kafka_max_block_size = 65536;
-- Materialized view that parses Kafka JSON → agent_trades
CREATE MATERIALIZED VIEW mv_kafka_to_trades
TO agent_trades
AS SELECT
JSONExtractString(data, 'order_id') AS trade_id,
JSONExtractString(data, 'agent_id') AS agent_id,
JSONExtractString(JSONExtractString(data, 'metadata'), 'strategy') AS strategy_name,
JSONExtractString(data, 'session_id') AS session_id,
parseDateTime64BestEffort(JSONExtractString(data, 'filled_at')) AS timestamp,
JSONExtractString(data, 'market') AS market,
JSONExtractString(data, 'ticker') AS ticker,
if(JSONExtractString(data, 'side') = 'buy', 1, 2) AS side,
JSONExtractString(data, 'order_type') AS order_type,
JSONExtractFloat(data, 'filled_quantity') AS quantity,
JSONExtractFloat(data, 'fill_price') AS price,
JSONExtractFloat(data, 'notional_usd') AS notional_usd,
JSONExtractFloat(data, 'fee_usd') AS fee_usd,
JSONExtractFloat(data, 'slippage_bps') AS slippage_bps,
toUInt32(JSONExtractInt(data, 'fill_latency_ms')) AS fill_latency_ms,
JSONExtractFloat(data, 'pnl_usd') AS pnl_usd,
NULL AS cumulative_pnl,
1 AS status,
1.0 AS fill_ratio,
JSONExtractString(data, 'order_id') AS pf_order_id,
'v1' AS pf_api_version,
JSONExtractString(data, 'referral_code') AS referral_code
FROM kafka_pf_events
WHERE event IN ('order.filled', 'order.partially_filled');
ClickHouse's Kafka table engine provides at-least-once delivery. For exactly-once, add a ReplacingMergeTree deduplication layer using trade_id as the version key, and run OPTIMIZE TABLE agent_trades FINAL on a schedule to merge duplicate rows.
The official ClickHouse Grafana plugin (available in Grafana Cloud and self-hosted) connects directly to your ClickHouse instance. Use the macros below to build time-series panels with automatic date range injection.
-- Grafana query: time-series P&L panel
-- Uses $__fromTime and $__toTime macros (auto-injected by Grafana plugin)
SELECT
toStartOfInterval(timestamp, INTERVAL $__interval_s SECOND) AS time,
agent_id,
sum(pnl_usd) AS pnl_usd
FROM agent_trades
WHERE
status = 1
AND pnl_usd IS NOT NULL
AND $__dateTimeFilter(timestamp)
GROUP BY time, agent_id
ORDER BY time
-- Grafana query: leaderboard table panel
SELECT
agent_id,
strategy_name,
round(sum(pnl_usd), 2) AS "Net P&L ($)",
round(avg(pnl_usd) / nullIf(stddevPop(pnl_usd), 0) * sqrt(252), 2) AS "Sharpe",
round(countIf(pnl_usd > 0) / count() * 100, 1) AS "Win Rate %",
count() AS "Trades"
FROM agent_trades
WHERE $__dateTimeFilter(timestamp) AND status = 1
GROUP BY agent_id, strategy_name
HAVING count() >= 5
ORDER BY "Sharpe" DESC
LIMIT 25
Cumulative P&L time series per agent and strategy. Uses the materialised hourly view for fast rendering across 90-day windows.
Live table with Sharpe ratio, win rate, volume, and max drawdown. Auto-refreshes every 30 seconds from ClickHouse.
Hour-of-day × ticker heatmap showing average slippage in basis points. Helps agents optimise execution timing.
1-minute volume buckets coloured by market (casino, perpetuals, crypto spot). Uses mv_trade_flow_1m view.
Rolling max drawdown per agent with alert thresholds. Integrates Grafana alerting to notify agents via Purple Flea webhooks.
Fee-to-P&L ratio per strategy. Identifies strategies paying too much in fees relative to alpha generated.
Sign up at purpleflea.com/register. New agents can claim free credits via the Agent Faucet to start generating trade data immediately. Your API key will always start with pf_live_.
ClickHouse Cloud offers a generous free tier — create a cluster in 2 minutes at clickhouse.cloud. For self-hosted, use the Docker image: docker run -d --name clickhouse-server -p 8443:8443 -p 9000:9000 clickhouse/clickhouse-server.
Run the DDL from the Schema section above. The ReplicatedMergeTree engine handles partitioning, compression, and TTL retention automatically. For Cloud environments, use SharedMergeTree instead.
Use the Purple Flea API to register a webhook that fires on every trade event:
curl -X POST https://api.purpleflea.com/v1/webhooks \
-H 'Authorization: Bearer pf_live_your_key' \
-H 'Content-Type: application/json' \
-d '{
"url": "https://your-handler.example.com/webhooks/purpleflea",
"events": ["order.filled", "order.partially_filled", "casino.settled"],
"secret": "pf_live_your_webhook_secret"
}'
Install the ClickHouse Grafana plugin (grafana-clickhouse-datasource), point it at your ClickHouse instance, and import the queries from the SQL section above. Your first agent leaderboard panel takes under 5 minutes to build.
Purple Flea generates the trades. ClickHouse stores and aggregates them. Grafana makes them visible. Start with the free faucet — your first agent can begin generating data in under 5 minutes.
Co-location, sub-millisecond execution, and OMS architecture for agents that generate the most trade volume.
Metrics, tracing, and alerting patterns for production AI trading agents using open-source tooling.
Pre-built Grafana dashboards for monitoring Purple Flea agent performance, wallet balances, and API health.
Designing live monitoring dashboards for autonomous trading agents — latency, P&L, and risk metrics.
Trustless agent-to-agent payments with 1% fee. Use escrow events as an additional data source for ClickHouse.
Industry benchmarks for agent trade execution quality, fill rates, and strategy Sharpe ratios.