Statistical arbitrage exploits predictable mean-reverting relationships between assets. This guide covers the complete stat arb toolkit for AI agents: cointegration testing with Engle-Granger and Johansen methods, ADF stationarity tests, z-score entry and exit signals, pairs trading mechanics, basket trading with multiple legs, mean reversion in BTC/ETH markets, and a full Python StatArbAgent implementation.
Statistical arbitrage (stat arb) is a class of trading strategies that exploit statistical relationships between assets rather than fundamental value or directional price predictions. Unlike pure arbitrage (which is risk-free), stat arb carries model risk and execution risk — but in return offers consistent, largely uncorrelated returns across market regimes.
The canonical form is pairs trading: find two assets whose prices move together over time (are cointegrated), monitor the spread between them, and trade when the spread deviates significantly from its historical mean. The bet is that the spread will revert back — not that either asset will go up or down.
Long one asset, short another. Profits from the spread, not from market direction. Works in bull, bear, and sideways markets.
All signals (z-scores, cointegration tests, hedge ratios) are quantitative. An agent can run the entire pipeline continuously without human input.
Crypto spreads can deviate and revert multiple times per day. A well-tuned agent can extract value from dozens of signals weekly.
Maximum loss per trade is bounded by the spread width at entry. Stop-losses are straightforward: exit when spread exceeds N standard deviations.
Before testing for cointegration, both price series must be confirmed to be non-stationary (i.e., integrated of order 1, written I(1)). The Augmented Dickey-Fuller (ADF) test is the standard tool for this.
The ADF test fits the regression:
The null hypothesis H0 is that γ = 0 (unit root exists, series is non-stationary). We want to fail to reject H0 for individual price series (confirming they are I(1)), and reject H0 for the spread (confirming the spread is stationary).
| Series | ADF p-value | Interpretation | Action |
|---|---|---|---|
| BTC price levels | 0.82 (high) | Cannot reject H0: non-stationary (I(1)) | Good — proceed to cointegration test |
| ETH price levels | 0.78 (high) | Cannot reject H0: non-stationary (I(1)) | Good — proceed to cointegration test |
| BTC log returns | 0.001 (very low) | Reject H0: stationary (I(0)) | Confirms BTC is I(1) in levels |
| BTC-ETH spread | 0.03 (low) | Reject H0: spread is stationary | Pair is cointegrated — trade the spread |
| BTC-ETH spread | 0.35 (high) | Cannot reject H0: spread is non-stationary | No cointegration — don't trade this pair |
import numpy as np
from statsmodels.tsa.stattools import adfuller
def adf_test(series: np.ndarray, name: str = "Series",
significance: float = 0.05) -> dict:
"""
Run ADF test and return structured result.
Returns: {stationary: bool, p_value: float, test_stat: float, critical_values: dict}
"""
result = adfuller(series, autolag='AIC', regression='ct') # include trend + constant
test_stat, p_value = result[0], result[1]
critical_values = result[4]
is_stationary = p_value < significance
print(f"ADF Test: {name}")
print(f" Test statistic: {test_stat:.4f}")
print(f" p-value: {p_value:.4f}")
print(f" Critical values: {critical_values}")
print(f" Stationary at {significance:.0%}: {is_stationary}")
return {
'stationary': is_stationary,
'p_value': p_value,
'test_stat': test_stat,
'critical_values': critical_values
}
# Usage:
# btc_prices = np.array([...])
# eth_prices = np.array([...])
# adf_test(btc_prices, "BTC") # should NOT be stationary (p > 0.05)
# adf_test(eth_prices, "ETH") # should NOT be stationary (p > 0.05)
# adf_test(np.log(btc_prices) - np.log(eth_prices), "log-spread") # SHOULD be stationary
Cointegration means that a linear combination of two or more I(1) series is I(0) (stationary). For a pairs trade, if y1 and y2 are both random walks but y1 - beta*y2 is stationary, they are cointegrated and the spread mean-reverts.
The Engle-Granger procedure (1987) is the simplest approach for two-asset pairs:
import numpy as np
from statsmodels.regression.linear_model import OLS
from statsmodels.tsa.stattools import adfuller, coint
from statsmodels.tools import add_constant
def engle_granger_test(y1: np.ndarray, y2: np.ndarray,
significance: float = 0.05) -> dict:
"""
Engle-Granger cointegration test for pair (y1, y2).
Uses log prices. Returns hedge ratio and cointegration strength.
"""
log_y1 = np.log(y1)
log_y2 = np.log(y2)
# Step 1: OLS regression log_y1 = alpha + beta * log_y2
X = add_constant(log_y2)
reg = OLS(log_y1, X).fit()
alpha = reg.params[0]
beta = reg.params[1]
residuals = reg.resid
# Step 2: ADF test on residuals
adf_result = adfuller(residuals, autolag='AIC', regression='c')
p_value = adf_result[1]
is_coint = p_value < significance
# Also use statsmodels built-in coint for cross-check
t_stat, p_val_coint, crit_vals = coint(log_y1, log_y2)
return {
'cointegrated': is_coint,
'ols_alpha': alpha,
'ols_beta': beta, # hedge ratio (units of y2 per unit of y1)
'residual_adf_pval': p_value,
'coint_pval': p_val_coint,
'residual_std': np.std(residuals),
'residuals': residuals,
'r_squared': reg.rsquared
}
# Example:
# result = engle_granger_test(btc_prices, eth_prices)
# if result['cointegrated']:
# print(f"Pair cointegrated! Beta={result['ols_beta']:.3f}")
# print(f"Residual std: {result['residual_std']:.4f}")
The Johansen test (1991) is more powerful than Engle-Granger and supports testing cointegration among three or more assets simultaneously (baskets). It estimates the number of cointegrating relationships (rank) in the system.
from statsmodels.tsa.vector_ar.vecm import coint_johansen
def johansen_test(price_matrix: np.ndarray, asset_names: list,
det_order: int = 0, k_ar_diff: int = 1) -> dict:
"""
Johansen cointegration test for a matrix of price series.
price_matrix: shape (T, N) where T = time steps, N = number of assets
Returns: number of cointegrating vectors, eigenvectors (portfolio weights)
"""
log_prices = np.log(price_matrix)
result = coint_johansen(log_prices, det_order=det_order, k_ar_diff=k_ar_diff)
# Trace statistic test
# H0: at most r cointegrating relationships
# Critical values at 90%, 95%, 99% confidence levels
trace_stats = result.lr1 # trace statistics
crit_vals_95 = result.cvt[:, 1] # 95% critical values
n_coint = 0
for i, (ts, cv) in enumerate(zip(trace_stats, crit_vals_95)):
if ts > cv:
n_coint = i + 1
print(f"Johansen Test: {' / '.join(asset_names)}")
print(f" Number of cointegrating vectors: {n_coint}")
for i in range(min(n_coint, result.evec.shape[1])):
weights = result.evec[:, i]
wstr = ", ".join(f"{a}={w:.3f}" for a, w in zip(asset_names, weights))
print(f" Vector {i+1}: [{wstr}]")
return {
'n_coint_vectors': n_coint,
'eigenvectors': result.evec, # cointegrating vectors
'eigenvalues': result.eig,
'trace_stats': trace_stats,
'crit_vals_95': crit_vals_95,
'log_prices': log_prices
}
# Example: test BTC, ETH, SOL basket
# prices = np.column_stack([btc_prices, eth_prices, sol_prices])
# result = johansen_test(prices, ['BTC', 'ETH', 'SOL'])
| Property | Engle-Granger | Johansen |
|---|---|---|
| Number of assets | 2 (pairs only) | 2+ (supports baskets) |
| Cointegrating vectors | 1 (OLS-estimated) | Multiple (full rank test) |
| Statistical power | Lower (single equation) | Higher (system of equations) |
| Hedge ratio estimation | Directly from OLS beta | From eigenvectors |
| Sensitivity to direction | Yes (y1 vs y2 asymmetric) | No (symmetric) |
| Complexity | Simple | Moderate |
| When to use | Quick screen for simple pairs | Final test and basket construction |
Once a cointegrated pair is identified and the hedge ratio is estimated, the agent computes a z-score that normalizes the current spread deviation into standard deviation units. The z-score is the primary trading signal.
Where mu_spread and sigma_spread are estimated from a rolling lookback window (typically 60–120 days). Using rolling rather than expanding windows ensures the z-score adapts to regime changes in the relationship.
import numpy as np
from collections import deque
class ZScoreCalculator:
def __init__(self, lookback: int = 60, beta: float = 1.0, alpha: float = 0.0):
self.lookback = lookback
self.beta = beta # hedge ratio
self.alpha = alpha # OLS intercept
self._spread_history = deque(maxlen=lookback)
def update(self, y1: float, y2: float) -> dict:
"""Update with new price observation and return current z-score."""
log_y1 = np.log(y1)
log_y2 = np.log(y2)
spread = log_y1 - self.beta * log_y2 - self.alpha
self._spread_history.append(spread)
if len(self._spread_history) < 20:
return {'z_score': None, 'spread': spread, 'n_obs': len(self._spread_history)}
arr = np.array(self._spread_history)
mu = arr.mean()
sigma = arr.std()
z = (spread - mu) / sigma if sigma > 1e-10 else 0.0
return {
'z_score': z,
'spread': spread,
'spread_mean': mu,
'spread_std': sigma,
'n_obs': len(self._spread_history),
'half_life': self._estimate_half_life(arr)
}
def _estimate_half_life(self, spread: np.ndarray) -> float:
"""Estimate mean-reversion half-life via AR(1) regression."""
if len(spread) < 10:
return np.nan
lagged = spread[:-1]
delta = spread[1:] - spread[:-1]
# OLS: delta_t = lambda * spread_{t-1} + epsilon
lam = np.cov(lagged, delta)[0, 1] / np.var(lagged)
half_life = -np.log(2) / lam if lam < 0 else np.nan
return half_life
| Signal Type | Condition | Action | Rationale |
|---|---|---|---|
| Long spread entry | z < -2.0 | Buy y1, sell beta units of y2 | Spread too low; expect reversion upward |
| Short spread entry | z > +2.0 | Sell y1, buy beta units of y2 | Spread too high; expect reversion downward |
| Close long spread | z > -0.5 | Exit long y1 / short y2 | Spread reverted to near mean |
| Close short spread | z < +0.5 | Exit short y1 / long y2 | Spread reverted to near mean |
| Stop-loss long | z < -4.0 | Emergency exit; relationship may be breaking down | Spread diverging; cointegration failure risk |
| Stop-loss short | z > +4.0 | Emergency exit | Spread diverging; cointegration failure risk |
Half-Life Matters: The mean-reversion half-life estimates how long it takes the spread to move halfway back to zero. If the half-life is >30 days, consider using longer lookback windows and wider z-score thresholds. Half-lives <3 days suggest higher-frequency signals are viable.
Static thresholds (always enter at z=2.0) are suboptimal. Adapt thresholds based on the current vol regime:
def adaptive_thresholds(spread_vol: float, base_vol: float = 0.02,
base_entry: float = 2.0, base_exit: float = 0.5) -> dict:
"""
Scale z-score thresholds by the ratio of current vol to base vol.
Higher vol = wider thresholds (avoid false signals during choppy markets).
"""
vol_ratio = spread_vol / max(base_vol, 1e-6)
entry = base_entry * min(max(vol_ratio, 0.5), 2.0) # clamp between 1.0x and 2.0x
exit_thr = base_exit * min(max(vol_ratio, 0.5), 2.0)
return {'entry': entry, 'exit': exit_thr, 'stop': entry * 2}
Executing a pairs trade requires simultaneous entry into both legs to avoid leg risk (the risk that you execute one leg but can't execute the other due to slippage or market movement). On Purple Flea Trading, the API supports atomic spread orders for simultaneous execution.
The goal is dollar-neutral sizing: the notional dollar value of the long leg equals the notional dollar value of the short leg. This ensures P&L is driven purely by the spread movement:
def compute_leg_sizes(notional_usd: float, y1_price: float, y2_price: float,
beta: float) -> dict:
"""
Compute sizes for both legs to achieve dollar-neutral exposure.
beta: hedge ratio (units of y2 per unit of y1)
For long spread (buy y1, sell y2):
- Leg 1 notional = notional_usd
- Leg 2 notional = notional_usd (dollar-neutral, not unit-neutral)
Unit neutral would require: size_y2 = beta * size_y1
But we want dollar neutral: size_y1 * y1_price = size_y2 * y2_price
"""
# Dollar-neutral sizing
size_y1 = notional_usd / y1_price # units of y1 to buy
# Hedge: need beta units of y2 per unit of y1 (log-scale), adjusted for prices
size_y2 = (beta * size_y1 * y1_price) / y2_price # units of y2 to sell
actual_notional_y1 = size_y1 * y1_price
actual_notional_y2 = size_y2 * y2_price
return {
'y1_size': size_y1,
'y2_size': size_y2,
'y1_notional': actual_notional_y1,
'y2_notional': actual_notional_y2,
'net_dollar_exposure': actual_notional_y1 - actual_notional_y2,
'hedge_ratio': beta
}
The BTC/ETH pair is the most-traded stat arb pair in crypto. Both are Proof-of-Work-derived Layer-1 blockchains (though ETH has moved to PoS) with overlapping user bases and fundamentally linked value. Historical data shows they have been cointegrated for extended periods with a hedge ratio of approximately 15–20 ETH per BTC.
| Period | Cointegrated? | Approx Beta | Half-life (days) | Notes |
|---|---|---|---|---|
| 2021 Bull Run | Yes (weak) | ~18x | 8–15 days | ETH outperformed BTC; beta drifted |
| 2022 Bear Market | Yes (strong) | ~15x | 5–10 days | Highly correlated crash; strong cointegration |
| 2023 Recovery | Yes | ~16x | 6–12 days | Stable period; good pairs trading conditions |
| 2024 ETF Era | Mixed | Variable | 10–20 days | BTC ETF inflows broke short-term cointegration |
| 2025–2026 | Yes (re-established) | ~17–19x | 7–14 days | Equilibrium restored after ETF demand shock |
Regime Awareness: Cointegration is not permanent. The BTC/ETH relationship breaks down during major structural events (ETF launches, ETH merge, regulatory shocks). Always re-test cointegration on a rolling 90-day window before trading. If the p-value rises above 0.10, halt the strategy.
While pairs trading uses two assets, basket trading extends the concept to three or more. A basket is a linear combination of assets with weights chosen so that the portfolio value mean-reverts. Basket trading offers more stable signals (less noise) and more opportunities (the universe is larger), but requires more sophisticated execution.
The first eigenvector from the Johansen test gives the most strongly cointegrating linear combination. For a BTC/ETH/SOL basket:
def construct_basket(johansen_result: dict, asset_names: list,
vector_idx: int = 0) -> dict:
"""
Extract portfolio weights from Johansen eigenvector.
vector_idx=0 gives the most strongly cointegrating combination.
"""
evec = johansen_result['eigenvectors'][:, vector_idx]
# Normalize to sum of absolute weights = 1 for dollar-neutral basket
norm_weights = evec / np.sum(np.abs(evec))
basket = {}
for asset, weight in zip(asset_names, norm_weights):
basket[asset] = {
'weight': float(weight),
'side': 'long' if weight > 0 else 'short',
'notional_pct': abs(float(weight))
}
return basket
# Example output for BTC/ETH/SOL:
# {'BTC': {'weight': 0.52, 'side': 'long', 'notional_pct': 0.52},
# 'ETH': {'weight': -0.31, 'side': 'short', 'notional_pct': 0.31},
# 'SOL': {'weight': -0.17, 'side': 'short', 'notional_pct': 0.17}}
def basket_spread(log_prices: dict, basket_weights: dict) -> float:
"""Compute basket portfolio value (spread) from log prices."""
return sum(basket_weights[asset]['weight'] * np.log(price)
for asset, price in log_prices.items()
if asset in basket_weights)
| Property | Pairs Trading | Basket Trading |
|---|---|---|
| Assets per trade | 2 | 3+ |
| Execution complexity | Low | High (must leg into N positions) |
| Signal quality | Noisier (2 assets) | Cleaner (more orthogonal) |
| Opportunities | Limited by pair count | Much larger universe |
| Transaction costs | 2 legs | N legs (higher costs) |
| Margin requirement | Low | Higher (multiple positions) |
| Cointegration stability | Can break easily | More robust (over-identified) |
Crypto markets exhibit strong mean-reversion dynamics at multiple timescales, driven by structural factors unique to digital assets: perpetual funding rates, liquidation cascades, DEX/CEX price divergences, and cross-exchange arbitrage flows.
Perpetual futures funding rates revert toward zero. When funding is extremely positive (longs paying shorts), the basis compresses as arbitrageurs short perp / long spot until funding normalizes.
After a cascade of long liquidations drives price below fair value, mean-reverting buy pressure emerges from well-capitalized agents who recognize the temporary dislocation.
Systematic price discrepancies between centralized and decentralized exchanges are closed by arbitrage bots within minutes. The spread is stationary with near-zero mean.
BTC-PERP prices on different exchanges (Binance vs OKX vs Purple Flea) converge through arbitrage. Inter-exchange basis trades are high-frequency, low-risk stat arb plays.
The Hurst exponent (H) quantifies the degree of mean reversion vs trend in a time series:
| H Value | Process Type | Trading Implication |
|---|---|---|
| H < 0.5 | Anti-persistent (mean-reverting) | Stat arb strategies profit; trend-following loses |
| H = 0.5 | Random walk (Brownian motion) | No edge for either approach |
| H > 0.5 | Persistent (trending) | Trend-following profits; stat arb loses |
def hurst_exponent(series: np.ndarray, min_lag: int = 2, max_lag: int = 100) -> float:
"""
Compute the Hurst exponent using R/S analysis.
H < 0.5: mean-reverting; H = 0.5: random walk; H > 0.5: trending.
"""
lags = range(min_lag, min(max_lag, len(series) // 4))
tau = []
rs_vals = []
for lag in lags:
segments = len(series) // lag
rs_list = []
for i in range(segments):
seg = series[i*lag:(i+1)*lag]
mean = seg.mean()
dev = (seg - mean).cumsum()
R = dev.max() - dev.min()
S = seg.std()
if S > 0:
rs_list.append(R / S)
if rs_list:
rs_vals.append(np.mean(rs_list))
tau.append(lag)
if len(tau) < 2:
return 0.5
return float(np.polyfit(np.log(tau), np.log(rs_vals), 1)[0])
The following complete agent implementation scans candidate pairs on Purple Flea Trading, tests for cointegration, computes rolling z-scores, and autonomously executes pairs trades when signals exceed thresholds.
"""
StatArbAgent - Statistical Arbitrage Pairs Trader
Scans Purple Flea Trading (275+ markets) for cointegrated pairs,
computes z-score signals, and executes mean-reversion trades.
"""
import asyncio
import aiohttp
import numpy as np
from statsmodels.tsa.stattools import adfuller, coint
from statsmodels.regression.linear_model import OLS
from statsmodels.tools import add_constant
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
from collections import deque
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger("StatArbAgent")
# ─── Data Classes ─────────────────────────────────────────────────────────────
@dataclass
class Pair:
symbol1: str
symbol2: str
beta: float # hedge ratio: y1 = alpha + beta*y2 + residual
alpha: float # OLS intercept
coint_pval: float # cointegration p-value (lower = stronger)
half_life: float # spread mean-reversion half-life in bars
spread_std: float # rolling spread standard deviation
last_tested: datetime = field(default_factory=datetime.utcnow)
hurst: float = 0.5
@property
def is_valid(self) -> bool:
"""Pair is still worth trading."""
return (self.coint_pval < 0.05 and
self.half_life < 30 and
self.hurst < 0.5)
@dataclass
class SpreadState:
pair: Pair
z_score: float
spread: float
spread_mean: float
spread_std: float
timestamp: datetime
@dataclass
class PairPosition:
pair: Pair
direction: str # 'long_spread' or 'short_spread'
entry_z: float
entry_spread: float
y1_size: float # units of y1
y2_size: float # units of y2
notional_usd: float
opened_at: datetime = field(default_factory=datetime.utcnow)
y1_order_id: str = ""
y2_order_id: str = ""
# ─── Cointegration Scanner ────────────────────────────────────────────────────
class CointegrationScanner:
def __init__(self, coint_threshold: float = 0.05,
max_half_life_bars: float = 30,
min_hurst_threshold: float = 0.50):
self.coint_threshold = coint_threshold
self.max_half_life = max_half_life_bars
self.min_hurst = min_hurst_threshold
def test_pair(self, y1: np.ndarray, y2: np.ndarray,
sym1: str, sym2: str) -> Optional[Pair]:
"""Run full cointegration test suite on a price pair."""
if len(y1) < 60 or len(y2) < 60:
return None
log_y1, log_y2 = np.log(y1), np.log(y2)
# Step 1: Both series must be non-stationary individually
adf1 = adfuller(log_y1, autolag='AIC')[1]
adf2 = adfuller(log_y2, autolag='AIC')[1]
if adf1 < 0.10 or adf2 < 0.10:
log.debug(f"Skipping {sym1}/{sym2}: one or both series appear stationary")
return None
# Step 2: Cointegration test
_, p_val, _ = coint(log_y1, log_y2)
if p_val >= self.coint_threshold:
log.debug(f"{sym1}/{sym2}: not cointegrated (p={p_val:.3f})")
return None
# Step 3: OLS to get hedge ratio
X = add_constant(log_y2)
reg = OLS(log_y1, X).fit()
alpha = float(reg.params[0])
beta = float(reg.params[1])
residuals = reg.resid
# Step 4: Half-life estimation
lagged = residuals[:-1]
delta = residuals[1:] - residuals[:-1]
lam = np.cov(lagged, delta)[0, 1] / max(np.var(lagged), 1e-10)
half_life = -np.log(2) / lam if lam < 0 else 999.0
if half_life > self.max_half_life or half_life <= 0:
log.debug(f"{sym1}/{sym2}: half-life too long ({half_life:.1f})")
return None
# Step 5: Hurst exponent check
hurst = self._hurst(residuals)
if hurst >= self.min_hurst:
log.debug(f"{sym1}/{sym2}: spread not mean-reverting (H={hurst:.3f})")
return None
pair = Pair(
symbol1=sym1, symbol2=sym2,
beta=beta, alpha=alpha,
coint_pval=p_val,
half_life=half_life,
spread_std=float(np.std(residuals)),
hurst=hurst
)
log.info(f"Pair found: {sym1}/{sym2} | beta={beta:.3f} | "
f"p={p_val:.3f} | HL={half_life:.1f} | H={hurst:.3f}")
return pair
def _hurst(self, series: np.ndarray) -> float:
lags = range(2, min(50, len(series) // 4))
tau, rs_vals = [], []
for lag in lags:
segs = len(series) // lag
rs_list = []
for i in range(segs):
seg = series[i*lag:(i+1)*lag]
S = seg.std()
if S > 0:
R = (seg - seg.mean()).cumsum()
rs_list.append((R.max() - R.min()) / S)
if rs_list:
rs_vals.append(np.mean(rs_list))
tau.append(lag)
if len(tau) < 2:
return 0.5
return float(np.polyfit(np.log(tau), np.log(rs_vals), 1)[0])
# ─── Live Z-Score Tracker ─────────────────────────────────────────────────────
class LiveZScoreTracker:
def __init__(self, pair: Pair, lookback: int = 60):
self.pair = pair
self.lookback = lookback
self._spread_hist = deque(maxlen=lookback)
def update(self, y1_price: float, y2_price: float) -> Optional[SpreadState]:
spread = (np.log(y1_price) - self.pair.beta * np.log(y2_price) - self.pair.alpha)
self._spread_hist.append(spread)
if len(self._spread_hist) < 20:
return None
arr = np.array(self._spread_hist)
mu, sigma = arr.mean(), arr.std()
z = (spread - mu) / sigma if sigma > 1e-10 else 0.0
return SpreadState(
pair=self.pair, z_score=z, spread=spread,
spread_mean=mu, spread_std=sigma, timestamp=datetime.utcnow()
)
# ─── Signal Generator ─────────────────────────────────────────────────────────
@dataclass
class TradeSignal:
pair: Pair
action: str # 'open_long', 'open_short', 'close', 'stop_loss'
z_score: float
confidence: float
class SignalGenerator:
def __init__(self, entry_z: float = 2.0, exit_z: float = 0.5,
stop_z: float = 4.0):
self.entry_z = entry_z
self.exit_z = exit_z
self.stop_z = stop_z
def generate(self, state: SpreadState,
current_position: Optional[PairPosition]) -> Optional[TradeSignal]:
z = state.z_score
pair = state.pair
# Position already open: check for exit or stop
if current_position is not None:
if current_position.direction == 'long_spread':
if abs(z) > self.stop_z:
return TradeSignal(pair, 'stop_loss', z, 1.0)
if z > -self.exit_z:
return TradeSignal(pair, 'close', z, 0.9)
elif current_position.direction == 'short_spread':
if abs(z) > self.stop_z:
return TradeSignal(pair, 'stop_loss', z, 1.0)
if z < self.exit_z:
return TradeSignal(pair, 'close', z, 0.9)
return None # hold
# No position: check for entry
if z < -self.entry_z:
conf = min(1.0, abs(z) / self.entry_z * 0.7)
return TradeSignal(pair, 'open_long', z, conf)
elif z > self.entry_z:
conf = min(1.0, abs(z) / self.entry_z * 0.7)
return TradeSignal(pair, 'open_short', z, conf)
return None
# ─── Execution Engine ─────────────────────────────────────────────────────────
class PairsExecutionEngine:
def __init__(self, api_key: str, base_url: str, notional_per_trade: float = 500.0):
self.api_key = api_key
self.base_url = base_url
self.notional = notional_per_trade
async def open_pair(self, session: aiohttp.ClientSession, signal: TradeSignal,
y1_price: float, y2_price: float) -> Optional[PairPosition]:
"""Open a pairs trade: two simultaneous opposing orders."""
pair = signal.pair
beta = pair.beta
size_y1 = self.notional / y1_price
size_y2 = (beta * size_y1 * y1_price) / y2_price
if signal.action == 'open_long':
# Long spread: buy y1, sell y2
side_y1, side_y2 = 'buy', 'sell'
direction = 'long_spread'
else:
# Short spread: sell y1, buy y2
side_y1, side_y2 = 'sell', 'buy'
direction = 'short_spread'
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
# Execute both legs simultaneously
results = await asyncio.gather(
self._place_order(session, headers, pair.symbol1, side_y1, size_y1),
self._place_order(session, headers, pair.symbol2, side_y2, size_y2),
return_exceptions=True
)
if any(isinstance(r, Exception) for r in results):
log.error(f"Failed to open pair {pair.symbol1}/{pair.symbol2}: {results}")
# Try to cancel any successful leg (flat out)
return None
entry_spread = (np.log(y1_price) - pair.beta * np.log(y2_price) - pair.alpha)
pos = PairPosition(
pair=pair, direction=direction,
entry_z=signal.z_score, entry_spread=entry_spread,
y1_size=size_y1, y2_size=size_y2, notional_usd=self.notional,
y1_order_id=results[0].get('order_id', ''),
y2_order_id=results[1].get('order_id', '')
)
log.info(f"Opened {direction}: {pair.symbol1}/{pair.symbol2} | z={signal.z_score:.2f}")
return pos
async def close_pair(self, session: aiohttp.ClientSession,
pos: PairPosition, reason: str = "signal"):
"""Close both legs of a pairs trade."""
pair = pos.pair
headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
if pos.direction == 'long_spread':
close_y1_side, close_y2_side = 'sell', 'buy'
else:
close_y1_side, close_y2_side = 'buy', 'sell'
await asyncio.gather(
self._place_order(session, headers, pair.symbol1, close_y1_side,
pos.y1_size, reduce_only=True),
self._place_order(session, headers, pair.symbol2, close_y2_side,
pos.y2_size, reduce_only=True),
)
log.info(f"Closed {pos.direction}: {pair.symbol1}/{pair.symbol2} | reason={reason}")
async def _place_order(self, session, headers, symbol, side, size, reduce_only=False):
order = {"symbol": symbol, "side": side, "size": size,
"order_type": "market", "reduce_only": reduce_only,
"meta": {"agent": "StatArbAgent/1.0"}}
async with session.post(f"{self.base_url}/api/orders", json=order,
headers=headers) as r:
return await r.json()
# ─── Main Agent ──────────────────────────────────────────────────────────────
class StatArbAgent:
CANDIDATE_PAIRS = [
("BTC-USD", "ETH-USD"),
("ETH-USD", "SOL-USD"),
("BTC-USD", "BNB-USD"),
("ETH-USD", "BNB-USD"),
("SOL-USD", "AVAX-USD"),
("BTC-USD", "SOL-USD"),
]
def __init__(self, api_key: str, base_url: str = "https://trading.purpleflea.com",
notional_per_trade: float = 500.0):
self.api_key = api_key
self.base_url = base_url
self.scanner = CointegrationScanner()
self.signal_gen = SignalGenerator()
self.executor = PairsExecutionEngine(api_key, base_url, notional_per_trade)
self.active_pairs: Dict[str, Pair] = {}
self.trackers: Dict[str, LiveZScoreTracker] = {}
self.positions: Dict[str, PairPosition] = {}
async def fetch_prices(self, session: aiohttp.ClientSession,
symbol: str, n: int = 120) -> np.ndarray:
headers = {"Authorization": f"Bearer {self.api_key}"}
async with session.get(f"{self.base_url}/api/ohlcv/{symbol}?interval=4h&limit={n}",
headers=headers) as r:
data = await r.json()
return np.array([float(c['close']) for c in data['candles']])
async def scan_pairs(self, session: aiohttp.ClientSession):
"""Re-test all candidate pairs for cointegration."""
log.info("Scanning pairs for cointegration...")
for sym1, sym2 in self.CANDIDATE_PAIRS:
pair_key = f"{sym1}_{sym2}"
try:
prices1, prices2 = await asyncio.gather(
self.fetch_prices(session, sym1),
self.fetch_prices(session, sym2)
)
pair = self.scanner.test_pair(prices1, prices2, sym1, sym2)
if pair and pair.is_valid:
self.active_pairs[pair_key] = pair
self.trackers[pair_key] = LiveZScoreTracker(pair)
log.info(f"Added/updated pair: {pair_key}")
elif pair_key in self.active_pairs:
# Pair failed re-test: close any open position and remove
if pair_key in self.positions:
await self.executor.close_pair(session, self.positions[pair_key],
reason="cointegration_failed")
del self.positions[pair_key]
del self.active_pairs[pair_key]
log.warning(f"Removed pair (cointegration failed): {pair_key}")
except Exception as e:
log.error(f"Error scanning {sym1}/{sym2}: {e}")
async def trade_cycle(self, session: aiohttp.ClientSession):
"""One trading cycle: update prices, compute z-scores, execute signals."""
for pair_key, pair in list(self.active_pairs.items()):
sym1, sym2 = pair.symbol1, pair.symbol2
try:
prices1, prices2 = await asyncio.gather(
self.fetch_prices(session, sym1, n=1),
self.fetch_prices(session, sym2, n=1)
)
y1_px, y2_px = prices1[-1], prices2[-1]
tracker = self.trackers[pair_key]
state = tracker.update(y1_px, y2_px)
if state is None:
continue
current_pos = self.positions.get(pair_key)
signal = self.signal_gen.generate(state, current_pos)
if signal is None:
continue
if signal.action in ('open_long', 'open_short'):
pos = await self.executor.open_pair(session, signal, y1_px, y2_px)
if pos:
self.positions[pair_key] = pos
elif signal.action in ('close', 'stop_loss') and current_pos:
await self.executor.close_pair(session, current_pos, reason=signal.action)
del self.positions[pair_key]
except Exception as e:
log.error(f"Trade cycle error {pair_key}: {e}")
async def run(self, scan_interval_minutes: int = 240,
trade_interval_seconds: int = 60):
"""Main agent loop: periodic pair scanning + frequent trading."""
log.info("StatArbAgent starting...")
last_scan = datetime.min
async with aiohttp.ClientSession() as session:
while True:
now = datetime.utcnow()
if (now - last_scan).total_seconds() >= scan_interval_minutes * 60:
await self.scan_pairs(session)
last_scan = now
if self.active_pairs:
await self.trade_cycle(session)
else:
log.info("No active pairs; waiting for scan...")
await asyncio.sleep(trade_interval_seconds)
if __name__ == "__main__":
import os
agent = StatArbAgent(api_key=os.environ["PURPLE_FLEA_API_KEY"], notional_per_trade=500.0)
asyncio.run(agent.run(scan_interval_minutes=240, trade_interval_seconds=60))
Before deploying a stat arb strategy live, historical backtesting is essential. The following summarizes realistic performance expectations and key backtest parameters:
| Parameter | Recommended Value | Sensitivity |
|---|---|---|
| Lookback window (cointegration) | 90–120 4h bars (15–20 days) | High: too short = noisy; too long = misses regime changes |
| Z-score lookback (rolling mean/std) | 60 bars (10 days) | Medium |
| Entry threshold | 1.8–2.2 | High: lower = more trades, more risk |
| Exit threshold | 0.3–0.7 | Medium |
| Stop-loss threshold | 3.5–4.5 | High: determines max loss per trade |
| Max holding period | 3× half-life | High: prevents being stuck in dead pairs |
| Re-cointegration test frequency | Every 5–7 days | Medium |
| Metric | Conservative | Base Case | Optimistic |
|---|---|---|---|
| Annual return | 8–12% | 15–25% | 30–45% |
| Sharpe ratio | 0.7–1.0 | 1.0–1.5 | 1.5–2.0 |
| Win rate | 55–62% | 62–70% | 70–78% |
| Max drawdown | 12–18% | 8–12% | 5–8% |
| Avg trades/month | 4–6 | 8–14 | 15–25 |
| Avg holding period | 3–8 days | 1–4 days | 6–48 hours |
Key Edge: Stat arb on BTC/ETH performs best during high-volatility, range-bound markets — exactly when directional strategies struggle most. Adding a stat arb agent to a directional trading book provides meaningful diversification benefit.
Purple Flea Trading provides 275+ perpetual markets — giving stat arb agents a large universe of potential pairs and baskets. The multi-chain Agent Wallet handles cross-exchange settlement, and the Faucet gives new agents $1 USDC free to begin testing without capital risk.
Visit /faucet to register your agent and claim $1 USDC. Use it to test pair execution logic before deploying real capital.
Use the CointegrationScanner above against Purple Flea's price history API. Start with the BTC/ETH canonical pair, then expand.
Run the agent with notional_per_trade=100 initially. Monitor z-scores and spread half-lives daily. Scale up as the strategy proves out.
Agent-to-Agent Settlement: When two stat arb agents hold opposing legs of the same pair on different platforms, Purple Flea's Escrow service enables direct P&L settlement at 1% fee — eliminating exchange spread costs entirely.
Purple Flea gives AI agents 275+ markets to find cointegrated pairs, multi-chain wallet infrastructure, and a free $1 USDC faucet to start testing without risk.