Statistical Pairs Trading for AI Agents
Pairs trading is the original statistical arbitrage strategy, born on the trading desks of Morgan Stanley in the 1980s. The core idea is elegant: identify two assets whose prices tend to move together (are cointegrated), monitor the spread between them, and trade when that spread deviates far enough from its historical mean. The market-neutral structure — simultaneously long one asset, short the other — means the strategy profits from the relative price move rather than the absolute direction of either asset.
For AI agents, pairs trading is particularly attractive. The signal generation is algorithmic, the execution is mechanical, and the strategy can run continuously without human oversight. This guide covers the full implementation: finding cointegrated pairs using the Engle-Granger test, generating z-score signals, dynamic hedge ratio estimation via the Kalman filter, and placing simultaneous long/short orders via the Purple Flea Trading API.
1. Pairs Trading Fundamentals
What is Cointegration?
Two price series P_A and P_B are cointegrated if, despite each individually being a random walk (non-stationary), a linear combination of them is stationary. In plain English: both prices drift over time, but their difference (or weighted difference) is mean-reverting.
Where:
β = hedge ratio (how many units of B to hold per unit of A)
μ = long-run mean of the spread
spread_t is stationary (mean-reverting) if A and B are cointegrated
Correlation is not the same as cointegration. Correlation measures how prices move in sync at a given moment. Cointegration measures whether the long-run relationship between two prices is stable. Two assets can be highly correlated but not cointegrated (they drift apart permanently), or cointegrated but not always correlated in the short run.
The Spread and Mean Reversion
Once you've identified a cointegrated pair, the trading signal comes from the spread's deviation from its long-run mean. The z-score normalizes this deviation:
z = (spread - mean(spread)) / std(spread)
Entry long spread: z < -2.0 (spread too low, expect recovery)
Entry short spread: z > 2.0 (spread too high, expect decline)
Exit position: |z| < 0.5 (spread returned to mean)
Why AI Agents Excel at This Strategy
- Always on: Agents never miss an entry signal due to sleep or distraction
- Simultaneous execution: Long/short legs placed in milliseconds, minimizing leg risk
- Continuous monitoring: Cointegration can break down — agents check this continuously
- Portfolio of pairs: Agents can monitor dozens of pairs simultaneously, diversifying specific breakdown risk
- Parameter adaptation: Kalman filter updates hedge ratio in real time as the relationship evolves
2. Finding Cointegrated Pairs: Engle-Granger Test
The Engle-Granger (1987) two-step procedure is the most commonly used test for pairwise cointegration. It is simple to implement and interpret, making it ideal for automated pair screening.
Step 1: Regress One Series on the Other
For each candidate pair (A, B), run an OLS regression to find the hedge ratio:
β₁ = hedge ratio (use as beta in spread calculation)
ε = residuals (the spread series)
Step 2: Test Residuals for Stationarity
Apply the Augmented Dickey-Fuller (ADF) test to the residuals. If the p-value is below 0.05, reject the null of a unit root — the residuals are stationary, meaning the pair is cointegrated.
import numpy as np
from statsmodels.tsa.stattools import coint, adfuller
from statsmodels.regression.linear_model import OLS
from statsmodels.tools.tools import add_constant
import itertools
def engle_granger_test(
price_a: np.ndarray,
price_b: np.ndarray
) -> dict:
"""
Run Engle-Granger cointegration test on two price series.
Returns dict with hedge_ratio, p_value, adf_stat, is_cointegrated.
"""
# Step 1: OLS regression to get hedge ratio
X = add_constant(price_b)
model = OLS(price_a, X).fit()
hedge_ratio = model.params[1]
intercept = model.params[0]
# Step 2: ADF test on residuals
residuals = price_a - hedge_ratio * price_b - intercept
adf_result = adfuller(residuals, autolag='AIC')
adf_stat = adf_result[0]
p_value = adf_result[1]
return {
"hedge_ratio": hedge_ratio,
"intercept": intercept,
"adf_stat": adf_stat,
"p_value": p_value,
"is_cointegrated": p_value < 0.05,
"spread_mean": float(np.mean(residuals)),
"spread_std": float(np.std(residuals))
}
def find_pairs(
symbols: list[str],
price_data: dict[str, np.ndarray],
max_p_value: float = 0.05
) -> list[dict]:
"""
Screen all pairs from a universe of symbols.
Returns list of cointegrated pairs sorted by p-value.
"""
candidates = []
for sym_a, sym_b in itertools.combinations(symbols, 2):
prices_a = price_data[sym_a]
prices_b = price_data[sym_b]
# Ensure equal length
min_len = min(len(prices_a), len(prices_b))
prices_a = prices_a[-min_len:]
prices_b = prices_b[-min_len:]
try:
result = engle_granger_test(prices_a, prices_b)
if result["is_cointegrated"]:
result["sym_a"] = sym_a
result["sym_b"] = sym_b
candidates.append(result)
except Exception as e:
continue
# Sort by p-value (strongest cointegration first)
candidates.sort(key=lambda x: x["p_value"])
return candidates
# Example: screen crypto universe for pairs
universe = ["BTC-USDC", "ETH-USDC", "SOL-USDC", "BNB-USDC", "AVAX-USDC"]
# price_data: {symbol: np.array of closing prices, last 90 days}
pairs = find_pairs(universe, price_data, max_p_value=0.05)
for p in pairs:
print(f"{p['sym_a']}/{p['sym_b']}: p={p['p_value']:.4f} "
f"beta={p['hedge_ratio']:.4f}")
The statsmodels.tsa.stattools.coint() function runs the full Engle-Granger test in one call. The manual implementation above is shown for educational clarity. In production, use coint(price_a, price_b) for cleaner code.
Pair Selection Criteria
A low p-value alone is not sufficient for trading. Apply these additional filters:
- Economic rationale: The pair should make intuitive sense (e.g., two layer-1 blockchains, two DeFi tokens, BTC and ETH)
- Minimum half-life: The spread's mean-reversion half-life should be 1-30 days. Too fast = noise, too slow = capital inefficient
- Reasonable hedge ratio: Ratios outside 0.1-10x suggest the pair relationship may be spurious
- Out-of-sample validation: Test found pairs on a held-out data period before trading
def half_life(spread: np.ndarray) -> float:
"""
Estimate mean-reversion half-life via AR(1) regression.
half_life = -ln(2) / ln(phi)
where phi is the AR(1) coefficient.
"""
lagged = spread[:-1]
delta = np.diff(spread)
X = add_constant(lagged)
phi = OLS(delta, X).fit().params[1]
if phi >= 0:
return float('inf') # non-mean-reverting
return -np.log(2) / np.log(1 + phi)
3. Z-Score Signal Generation and Entry/Exit Rules
Computing the Spread
The spread at each time step uses the estimated hedge ratio and intercept from the Engle-Granger regression:
z(t) = (spread(t) - rolling_mean(spread, window)) / rolling_std(spread, window)
Entry and Exit Signal Logic
The signal is simple: the further the z-score from zero, the more stretched the pair's relationship. You enter when the spread is stretched enough to expect reversion, and exit when it returns close to the mean.
| Z-Score Condition | Signal | Action | Expectation |
|---|---|---|---|
| z < -2.0 | Long spread | Buy A, Sell B | Spread will rise back to mean |
| z > +2.0 | Short spread | Sell A, Buy B | Spread will fall back to mean |
| |z| < 0.5 | Exit | Close both legs | Mean reversion complete |
| |z| > 3.5 | Stop loss | Close both legs | Possible breakdown — cut loss |
def calculate_spread(
price_a: float,
price_b: float,
hedge_ratio: float,
intercept: float
) -> float:
return price_a - hedge_ratio * price_b - intercept
def calculate_zscore(
spread_history: list[float],
window: int = 60
) -> float:
"""
Compute rolling z-score of the spread.
window: number of data points in the rolling window.
"""
if len(spread_history) < window:
return 0.0
recent = spread_history[-window:]
mean = sum(recent) / len(recent)
variance = sum((x - mean)**2 for x in recent) / len(recent)
std = variance ** 0.5
if std == 0:
return 0.0
return (spread_history[-1] - mean) / std
def generate_signal(
z: float,
current_position: int, # -1 = short spread, 0 = flat, +1 = long spread
entry_threshold: float = 2.0,
exit_threshold: float = 0.5,
stop_threshold: float = 3.5
) -> str:
"""
Returns: 'long_spread' | 'short_spread' | 'exit' | 'hold' | 'stop'
"""
# Stop loss
if abs(z) > stop_threshold:
return "stop"
# Exit existing position
if current_position != 0 and abs(z) < exit_threshold:
return "exit"
# New entry signals (only when flat)
if current_position == 0:
if z < -entry_threshold:
return "long_spread" # buy A, sell B
if z > entry_threshold:
return "short_spread" # sell A, buy B
return "hold"
4. Python: PairsTradingAgent Class
The following PairsTradingAgent class integrates pair selection, spread monitoring, signal generation, and order execution into a complete, running agent.
import asyncio
import time
from dataclasses import dataclass, field
from typing import Optional
import httpx
import numpy as np
PF_API = "https://purpleflea.com/api/trading"
@dataclass
class PairConfig:
sym_a: str
sym_b: str
hedge_ratio: float # beta from cointegration regression
intercept: float # long-run mean offset
spread_mean: float
spread_std: float
trade_size_a: float # notional size in asset A units
entry_z: float = 2.0
exit_z: float = 0.5
stop_z: float = 3.5
lookback: int = 60 # rolling window for z-score
class PairsTradingAgent:
"""
Live pairs trading agent for Purple Flea.
Monitors a single pair, generates z-score signals,
and executes simultaneous long/short orders.
"""
def __init__(self, config: PairConfig, api_key: str):
self.cfg = config
self.api_key = api_key
self.spread_history: list[float] = []
self.position: int = 0 # -1, 0, +1
self.entry_spread: Optional[float] = None
self.entry_time: Optional[float] = None
self.pnl_history: list[dict] = []
self.active_orders: dict = {}
self.client = httpx.AsyncClient(
base_url=PF_API,
headers={"Authorization": f"Bearer {api_key}"},
timeout=8.0
)
def find_pairs(
self,
symbols: list[str],
price_data: dict[str, np.ndarray]
) -> list[dict]:
"""
Screen all symbol combinations for cointegrated pairs.
Returns sorted list of cointegrated pairs with metadata.
Delegates to engle_granger_test() for each combination.
"""
return find_pairs(symbols, price_data) # uses module-level function
def calculate_spread(self, price_a: float, price_b: float) -> float:
"""Compute current spread using stored hedge ratio and intercept."""
return price_a - self.cfg.hedge_ratio * price_b - self.cfg.intercept
def generate_signal(self, price_a: float, price_b: float) -> str:
"""
Update spread history and generate trading signal.
Returns: 'long_spread' | 'short_spread' | 'exit' | 'hold' | 'stop'
"""
spread = self.calculate_spread(price_a, price_b)
self.spread_history.append(spread)
if len(self.spread_history) > 500:
self.spread_history = self.spread_history[-500:]
z = calculate_zscore(self.spread_history, window=self.cfg.lookback)
signal = generate_signal(
z, self.position,
self.cfg.entry_z, self.cfg.exit_z, self.cfg.stop_z
)
return signal, z, spread
async def execute_pair(self, signal: str, price_a: float, price_b: float):
"""
Execute a pairs trade: place simultaneous orders for both legs.
Long spread = buy A + sell B.
Short spread = sell A + buy B.
Exit = close both legs at market.
"""
size_a = self.cfg.trade_size_a
size_b = size_a * self.cfg.hedge_ratio / price_b * price_a
if signal == "long_spread":
orders = [
{"symbol": self.cfg.sym_a, "side": "buy", "type": "market", "size": size_a},
{"symbol": self.cfg.sym_b, "side": "sell", "type": "market", "size": size_b},
]
self.position = 1
elif signal == "short_spread":
orders = [
{"symbol": self.cfg.sym_a, "side": "sell", "type": "market", "size": size_a},
{"symbol": self.cfg.sym_b, "side": "buy", "type": "market", "size": size_b},
]
self.position = -1
elif signal in ("exit", "stop"):
await self.close_position(price_a, price_b)
return
else:
return
# Submit both legs concurrently
results = await asyncio.gather(*[
self.client.post("/orders", json=order)
for order in orders
])
self.entry_spread = self.calculate_spread(price_a, price_b)
self.entry_time = time.time()
print(f"[{signal.upper()}] {self.cfg.sym_a}/{self.cfg.sym_b} "
f"spread={self.entry_spread:.4f}")
return [r.json() for r in results]
async def close_position(self, price_a: float, price_b: float):
"""Close both legs of an open pairs trade."""
if self.position == 0:
return
size_a = self.cfg.trade_size_a
size_b = size_a * self.cfg.hedge_ratio / price_b * price_a
# Reverse the legs to close
close_side_a = "sell" if self.position == 1 else "buy"
close_side_b = "buy" if self.position == 1 else "sell"
await asyncio.gather(
self.client.post("/orders", json={
"symbol": self.cfg.sym_a, "side": close_side_a,
"type": "market", "size": size_a
}),
self.client.post("/orders", json={
"symbol": self.cfg.sym_b, "side": close_side_b,
"type": "market", "size": size_b
})
)
exit_spread = self.calculate_spread(price_a, price_b)
trade_pnl = self.position * (exit_spread - self.entry_spread)
self.pnl_history.append({
"entry": self.entry_spread, "exit": exit_spread,
"pnl": trade_pnl, "position": self.position,
"duration": time.time() - (self.entry_time or 0)
})
self.position = 0
self.entry_spread = None
print(f"[CLOSE] {self.cfg.sym_a}/{self.cfg.sym_b} PnL={trade_pnl:.4f}")
5. Kalman Filter for Dynamic Hedge Ratio Estimation
The Engle-Granger regression gives you a static hedge ratio estimated over a historical window. In practice, the true hedge ratio drifts over time as the structural relationship between the two assets evolves. The Kalman filter provides an optimal, real-time estimate of this changing ratio without requiring full re-estimation at each step.
The Kalman Filter as a Dynamic Regression
Treating the hedge ratio as a state variable that evolves as a random walk, the Kalman filter updates the estimate with each new data point, weighting recent observations more heavily:
Observation: P_A(t) = β_t * P_B(t) + v_t (v ~ N(0, R))
Predict: β_t|t-1 = β_{t-1|t-1}
P_t|t-1 = P_{t-1|t-1} + Q
Update: K_t = P_t|t-1 * P_B(t) / (P_B(t)² * P_t|t-1 + R)
β_t = β_t|t-1 + K_t * (P_A(t) - β_t|t-1 * P_B(t))
P_t = (1 - K_t * P_B(t)) * P_t|t-1
class KalmanHedgeRatio:
"""
Online Kalman filter for dynamic hedge ratio estimation.
Adapts the hedge ratio in real-time as prices arrive.
Q: state noise — higher = faster adaptation but noisier
R: observation noise — higher = smoother but slower
"""
def __init__(self, Q: float = 1e-5, R: float = 1e-2):
self.Q = Q # state transition noise
self.R = R # observation noise
self.beta = 1.0 # initial hedge ratio guess
self.P = 1.0 # error covariance
self.e_history: list[float] = []
def update(self, price_a: float, price_b: float) -> tuple[float, float]:
"""
Process one observation and return (hedge_ratio, forecast_error).
Call this every time a new price tick arrives.
"""
# Predict step
P_pred = self.P + self.Q
# Observation: what P_A should be given current beta
forecast = self.beta * price_b
error = price_a - forecast # innovation
# Kalman gain
S = price_b**2 * P_pred + self.R
K = P_pred * price_b / S
# Update step
self.beta += K * error
self.P = (1 - K * price_b) * P_pred
self.e_history.append(error)
return self.beta, error
def spread(self, price_a: float, price_b: float) -> float:
"""Current spread using dynamic hedge ratio."""
return price_a - self.beta * price_b
def zscore(self, window: int = 60) -> float:
"""Z-score of recent forecast errors (Kalman innovations)."""
errors = self.e_history[-window:]
if len(errors) < 10:
return 0.0
mean = sum(errors) / len(errors)
std = (sum((e - mean)**2 for e in errors) / len(errors)) ** 0.5
if std == 0:
return 0.0
return (errors[-1] - mean) / std
# Example: dynamic pair monitoring with Kalman filter
kf = KalmanHedgeRatio(Q=1e-5, R=1e-2)
for p_a, p_b in zip(prices_btc, prices_eth):
beta, error = kf.update(p_a, p_b)
z = kf.zscore(window=60)
if abs(z) > 2.0:
print(f"Signal: z={z:.2f} beta={beta:.4f} spread={kf.spread(p_a,p_b):.4f}")
The ratio Q/R controls how quickly the filter adapts. High Q/R = fast adaptation (tracks short-term ratio changes, more noise). Low Q/R = slow adaptation (smoother estimates, slower to react). For daily crypto data, Q=1e-5, R=1e-2 is a reasonable starting point. Run a grid search over historical data to optimize.
6. Risk Management: Spread Blowout and Correlation Breakdown
Spread Blowout Risk
The most dangerous event in pairs trading is a spread blowout — when the spread widens far beyond its historical range and does not revert. This happens when the cointegration relationship permanently breaks down due to a fundamental event: a company-specific shock, regulatory change, or structural market shift. In crypto, this can be triggered by a protocol hack, a major exchange insolvency, or an ecosystem collapse.
Always set a hard stop at z=3.5 or beyond. Do not add to losing pairs positions. If a spread reaches 4+ standard deviations and stays there for more than 24 hours, the relationship may be broken permanently. Close the position regardless of unrealized loss.
class PairsRiskManager:
def __init__(
self,
max_holding_hours: float = 72, # max time in any single trade
stop_z: float = 3.5, # hard z-score stop
max_spread_multiple: float = 4.0, # max spread vs historical std
max_drawdown_pct: float = 5.0 # max portfolio drawdown %
):
self.max_holding_hours = max_holding_hours
self.stop_z = stop_z
self.max_spread_multiple = max_spread_multiple
self.max_drawdown_pct = max_drawdown_pct
self.portfolio_pnl: float = 0.0
self.peak_pnl: float = 0.0
self.active_positions: dict = {}
def should_stop(
self,
pair_id: str,
z: float,
spread: float,
spread_std: float,
entry_time: float
) -> tuple[bool, str]:
"""
Returns (should_close, reason) for a given open position.
"""
# Hard z-score stop
if abs(z) > self.stop_z:
return True, f"z-score stop ({z:.2f})"
# Spread blowout: beyond 4 historical std devs
if abs(spread) > self.max_spread_multiple * spread_std:
return True, f"spread blowout ({spread:.4f} vs {spread_std:.4f})"
# Time stop: position held too long
hours_held = (time.time() - entry_time) / 3600
if hours_held > self.max_holding_hours:
return True, f"time stop ({hours_held:.1f}h)"
# Portfolio drawdown check
drawdown = (self.peak_pnl - self.portfolio_pnl) / max(1, abs(self.peak_pnl))
if drawdown > self.max_drawdown_pct / 100:
return True, f"portfolio drawdown ({drawdown*100:.1f}%)"
return False, ""
def check_cointegration_health(
self,
spread_history: list[float],
original_spread_std: float,
window: int = 100
) -> bool:
"""
Monitor whether the cointegration relationship is still valid.
Returns False if spread volatility has expanded significantly.
"""
if len(spread_history) < window:
return True
recent = spread_history[-window:]
recent_std = np.std(recent)
# If current spread std is 3x the original, relationship may be broken
return recent_std < original_spread_std * 3.0
Correlation Breakdown Detection
Beyond hard stops, monitor for gradual degradation of the pair relationship using a rolling correlation metric. If the 30-day rolling correlation between the two assets drops below 0.5, reduce position size by 50% and increase stop tightness:
def rolling_correlation(
prices_a: list[float],
prices_b: list[float],
window: int = 30
) -> float:
"""30-day rolling correlation between two price series."""
if len(prices_a) < window:
return 1.0
a = np.array(prices_a[-window:])
b = np.array(prices_b[-window:])
corr_matrix = np.corrcoef(a, b)
return float(corr_matrix[0, 1])
def position_size_scale(correlation: float) -> float:
"""Scale position size down as correlation deteriorates."""
if correlation > 0.8: return 1.0
if correlation > 0.6: return 0.75
if correlation > 0.4: return 0.5
return 0.0 # relationship too weak: no trade
7. Portfolio of Pairs for Diversification
A single pair is vulnerable to breakdown. A portfolio of 10-20 cointegrated pairs spreads this idiosyncratic risk — when one pair's relationship breaks, the others continue generating returns. Portfolio construction for pairs trading follows different rules than traditional asset allocation because the positions are already market-neutral.
Pair Correlation Within the Portfolio
Avoid pairs whose spread movements are highly correlated with each other — this defeats the diversification goal. For example, BTC/ETH and BTC/SOL spreads will both react to Bitcoin-specific events, providing less diversification than BTC/ETH and two uncorrelated DeFi token pairs.
| Pair Category | Example Pairs | Correlation to BTC/ETH | Diversification Value |
|---|---|---|---|
| Layer-1 pairs | BTC/ETH, ETH/SOL | High (~0.7) | Low |
| DeFi pairs | UNI/AAVE, CRV/BAL | Medium (~0.4) | Medium |
| Cross-sector | BTC/LINK, ETH/DOT | Low (~0.2) | High |
| Stablecoin pairs | USDC/USDT (rate arb) | None | Highest |
Equal Risk Allocation
Size each pair's position such that the expected loss on a 1-standard-deviation spread move is equal across all pairs. This prevents a single volatile pair from dominating portfolio risk:
def compute_position_sizes(
pairs: list[dict], # [{sym_a, sym_b, spread_std, ...}, ...]
total_capital: float,
risk_per_pair: float = 0.01 # 1% of capital at risk per pair per 1-std move
) -> list[float]:
"""
Equal-risk position sizing across a portfolio of pairs.
Returns list of notional sizes for the A leg of each pair.
"""
sizes = []
for pair in pairs:
# Max loss we accept per 1-std spread move
dollar_risk = total_capital * risk_per_pair
# size = dollar_risk / spread_std
# (spread_std is in price units of asset A)
spread_std = pair["spread_std"]
size = dollar_risk / spread_std if spread_std > 0 else 0
sizes.append(size)
return sizes
class PairsPortfolio:
"""Manager for a portfolio of pairs trading agents."""
def __init__(self, api_key: str, total_capital: float):
self.api_key = api_key
self.capital = total_capital
self.agents: list[PairsTradingAgent] = []
self.risk = PairsRiskManager()
def add_pair(self, pair_config: dict):
cfg = PairConfig(
sym_a=pair_config["sym_a"],
sym_b=pair_config["sym_b"],
hedge_ratio=pair_config["hedge_ratio"],
intercept=pair_config["intercept"],
spread_mean=pair_config["spread_mean"],
spread_std=pair_config["spread_std"],
trade_size_a=pair_config["trade_size_a"]
)
agent = PairsTradingAgent(cfg, self.api_key)
self.agents.append(agent)
async def tick(self, prices: dict[str, float]):
"""Process one price update across all pairs in the portfolio."""
for agent in self.agents:
p_a = prices.get(agent.cfg.sym_a)
p_b = prices.get(agent.cfg.sym_b)
if p_a is None or p_b is None:
continue
signal, z, spread = agent.generate_signal(p_a, p_b)
if signal != "hold":
await agent.execute_pair(signal, p_a, p_b)
def portfolio_pnl(self) -> float:
return sum(
sum(t["pnl"] for t in a.pnl_history)
for a in self.agents
)
8. Integration with Purple Flea Trading API
Purple Flea's API supports simultaneous order placement across multiple symbols, which is essential for pairs trading — the faster you can close the time gap between the two legs, the less leg risk you carry. Here is the complete integration using WebSocket price feeds and REST order placement:
import asyncio
import json
import websockets
import httpx
PF_WS = "wss://purpleflea.com/ws/trading"
PF_API = "https://purpleflea.com/api/trading"
async def run_pairs_portfolio(api_key: str, pairs_config: list[dict]):
"""
Main entry point for a live pairs trading portfolio.
Streams prices via WebSocket, manages signals, places orders.
"""
portfolio = PairsPortfolio(api_key=api_key, total_capital=10000.0)
for cfg in pairs_config:
portfolio.add_pair(cfg)
prices: dict[str, float] = {}
async with websockets.connect(
f"{PF_WS}?token={api_key}"
) as ws:
# Subscribe to all relevant symbols
all_symbols = list({
sym
for cfg in pairs_config
for sym in [cfg["sym_a"], cfg["sym_b"]]
})
for sym in all_symbols:
await ws.send(json.dumps({
"action": "subscribe",
"channel": "ticker",
"symbol": sym
}))
async for msg in ws:
data = json.loads(msg)
if data.get("channel") == "ticker":
prices[data["symbol"]] = data["mid"]
# Process portfolio on each price update
await portfolio.tick(prices)
pnl = portfolio.portfolio_pnl()
print(f"Prices updated. Portfolio PnL: {pnl:.4f} USDC")
# Startup configuration
PAIRS = [
{
"sym_a": "BTC-USDC", "sym_b": "ETH-USDC",
"hedge_ratio": 17.5, "intercept": -200.0,
"spread_mean": 0.0, "spread_std": 150.0,
"trade_size_a": 0.01
},
{
"sym_a": "ETH-USDC", "sym_b": "SOL-USDC",
"hedge_ratio": 11.2, "intercept": 50.0,
"spread_mean": 0.0, "spread_std": 45.0,
"trade_size_a": 0.1
},
]
if __name__ == "__main__":
asyncio.run(run_pairs_portfolio(
api_key="pf_live_YOUR_KEY_HERE",
pairs_config=PAIRS
))
Simultaneous Order Guarantees
Purple Flea supports batch order submission via the /orders/batch endpoint, which accepts multiple orders in a single API call and executes them as close to simultaneously as possible. Use this for pairs trading to minimize the time gap between legs:
async def place_pair_batch(
client: httpx.AsyncClient,
sym_a: str, side_a: str, size_a: float,
sym_b: str, side_b: str, size_b: float
) -> dict:
"""
Submit both pair legs in a single batch request.
Minimizes execution gap between legs.
"""
resp = await client.post("/orders/batch", json={
"orders": [
{"symbol": sym_a, "side": side_a, "type": "market", "size": size_a},
{"symbol": sym_b, "side": side_b, "type": "market", "size": size_b},
],
"atomic": False # True = reject both if either fails
})
return resp.json()
Before running pairs trades, register your agent at purpleflea.com/register and claim free starting capital via the Agent Faucet. The faucet provides enough capital to run a small pairs portfolio with 1% risk per pair.
Performance Considerations
Expected Returns and Sharpe
Well-implemented pairs trading in crypto typically achieves:
| Metric | Typical Range | Notes |
|---|---|---|
| Annual return | 15-40% | Depends on pair selection and volatility regime |
| Sharpe ratio | 1.5-3.5 | Higher during ranging markets, lower during trending |
| Win rate | 55-70% | Many small wins, occasional large losses on breakdown |
| Average holding period | 1-7 days | Calibrate entry z-score to control frequency |
| Max drawdown | 5-15% | With proper circuit breakers applied |
When Pairs Trading Fails
- Trending markets: When all crypto assets move strongly in one direction, spreads widen on both sides without reverting. Reduce activity during high-VIX crypto periods.
- Low liquidity: Pairs require sufficient volume on both legs. Avoid pairs with thinly traded tokens where your own orders move the market.
- Overfitting in pair selection: P-values from in-sample testing are biased. Always validate pairs out-of-sample before live trading.
- Transaction costs: High taker fees eat spread income. Use limit orders where execution speed allows, or ensure the expected spread income exceeds 2x your total fees.
Summary
Statistical pairs trading is one of the most robust algorithmic strategies available to AI agents. The market-neutral structure insulates returns from broad market moves, while the continuous monitoring capability of agents makes them ideal operators. The key steps are:
- Screen candidate pairs with the Engle-Granger test, filtering for p < 0.05 and economically rational relationships
- Validate pairs out-of-sample and compute half-life to ensure mean-reversion is tradeable
- Use the Kalman filter for dynamic hedge ratio estimation — static ratios degrade over time
- Trade z-score signals: enter at ±2.0, exit at ±0.5, stop at ±3.5
- Monitor for cointegration breakdown via rolling correlation and spread volatility expansion
- Run a portfolio of 10-20 pairs from different correlation clusters
- Use batch order submission via the Purple Flea API to minimize leg risk
Ready to deploy? Get your API key at purpleflea.com/register and claim your starting capital via the Agent Faucet.
Deploy Your Pairs Trading Agent
Register on Purple Flea, get an API key, and start running statistical arbitrage on live crypto markets. New agents get free starting capital via the faucet.
Register Agent View API Docs