Strategy

Statistical Arbitrage for AI Agents:
Pairs Trading and Mean Reversion

March 2026 Purple Flea Research 24 min read

Statistical arbitrage exploits predictable mean-reverting relationships between assets. This guide covers the complete stat arb toolkit for AI agents: cointegration testing with Engle-Granger and Johansen methods, ADF stationarity tests, z-score entry and exit signals, pairs trading mechanics, basket trading with multiple legs, mean reversion in BTC/ETH markets, and a full Python StatArbAgent implementation.

Strategy Research Trading

1. Statistical Arbitrage: The Core Idea

Statistical arbitrage (stat arb) is a class of trading strategies that exploit statistical relationships between assets rather than fundamental value or directional price predictions. Unlike pure arbitrage (which is risk-free), stat arb carries model risk and execution risk — but in return offers consistent, largely uncorrelated returns across market regimes.

The canonical form is pairs trading: find two assets whose prices move together over time (are cointegrated), monitor the spread between them, and trade when the spread deviates significantly from its historical mean. The bet is that the spread will revert back — not that either asset will go up or down.

Why Stat Arb Is Ideal for AI Agents

📉

Market-Neutral

Long one asset, short another. Profits from the spread, not from market direction. Works in bull, bear, and sideways markets.

🤖

Fully Automatable

All signals (z-scores, cointegration tests, hedge ratios) are quantitative. An agent can run the entire pipeline continuously without human input.

High Signal Frequency

Crypto spreads can deviate and revert multiple times per day. A well-tuned agent can extract value from dozens of signals weekly.

📊

Defined Risk Profile

Maximum loss per trade is bounded by the spread width at entry. Stop-losses are straightforward: exit when spread exceeds N standard deviations.

The Statistical Arbitrage Pipeline

  1. Universe selection: Identify candidate asset pairs or baskets with fundamental economic reasons to be related (e.g., BTC and ETH both represent Layer-1 blockchain value).
  2. Stationarity testing: Verify neither asset alone is stationary (ADF test). This is a prerequisite for cointegration testing.
  3. Cointegration testing: Engle-Granger two-step test or Johansen trace test to verify the spread is stationary.
  4. Hedge ratio estimation: OLS regression or Kalman filter to estimate the linear combination that produces a stationary spread.
  5. Z-score computation: Normalize the spread into standard deviation units.
  6. Signal generation: Enter when |z| exceeds threshold (e.g., 2.0); exit when |z| < exit threshold (e.g., 0.5).
  7. Position execution: Leg into trades simultaneously on both assets.
  8. Risk management: Stop-loss if |z| exceeds maximum threshold (e.g., 4.0).

2. ADF Test for Stationarity

Before testing for cointegration, both price series must be confirmed to be non-stationary (i.e., integrated of order 1, written I(1)). The Augmented Dickey-Fuller (ADF) test is the standard tool for this.

What the ADF Test Checks

The ADF test fits the regression:

Δy_t = α + βt + γy_{t-1} + ∑δ_i Δy_{t-i} + ε_t

The null hypothesis H0 is that γ = 0 (unit root exists, series is non-stationary). We want to fail to reject H0 for individual price series (confirming they are I(1)), and reject H0 for the spread (confirming the spread is stationary).

Interpreting ADF Results

SeriesADF p-valueInterpretationAction
BTC price levels0.82 (high)Cannot reject H0: non-stationary (I(1))Good — proceed to cointegration test
ETH price levels0.78 (high)Cannot reject H0: non-stationary (I(1))Good — proceed to cointegration test
BTC log returns0.001 (very low)Reject H0: stationary (I(0))Confirms BTC is I(1) in levels
BTC-ETH spread0.03 (low)Reject H0: spread is stationaryPair is cointegrated — trade the spread
BTC-ETH spread0.35 (high)Cannot reject H0: spread is non-stationaryNo cointegration — don't trade this pair
import numpy as np
from statsmodels.tsa.stattools import adfuller

def adf_test(series: np.ndarray, name: str = "Series",
             significance: float = 0.05) -> dict:
    """
    Run ADF test and return structured result.
    Returns: {stationary: bool, p_value: float, test_stat: float, critical_values: dict}
    """
    result = adfuller(series, autolag='AIC', regression='ct')  # include trend + constant
    test_stat, p_value = result[0], result[1]
    critical_values = result[4]

    is_stationary = p_value < significance
    print(f"ADF Test: {name}")
    print(f"  Test statistic: {test_stat:.4f}")
    print(f"  p-value: {p_value:.4f}")
    print(f"  Critical values: {critical_values}")
    print(f"  Stationary at {significance:.0%}: {is_stationary}")
    return {
        'stationary': is_stationary,
        'p_value': p_value,
        'test_stat': test_stat,
        'critical_values': critical_values
    }

# Usage:
# btc_prices = np.array([...])
# eth_prices = np.array([...])
# adf_test(btc_prices, "BTC")    # should NOT be stationary (p > 0.05)
# adf_test(eth_prices, "ETH")    # should NOT be stationary (p > 0.05)
# adf_test(np.log(btc_prices) - np.log(eth_prices), "log-spread")  # SHOULD be stationary

3. Cointegration Testing: Engle-Granger and Johansen

Cointegration means that a linear combination of two or more I(1) series is I(0) (stationary). For a pairs trade, if y1 and y2 are both random walks but y1 - beta*y2 is stationary, they are cointegrated and the spread mean-reverts.

Method 1: Engle-Granger Two-Step Test

The Engle-Granger procedure (1987) is the simplest approach for two-asset pairs:

  1. Step 1 — Estimate hedge ratio: Run OLS regression of y1 on y2: y1 = alpha + beta*y2 + epsilon. The coefficient beta is the hedge ratio.
  2. Step 2 — Test residuals for stationarity: Run ADF test on the residuals epsilon = y1 - alpha - beta*y2. If residuals are stationary, the pair is cointegrated.
import numpy as np
from statsmodels.regression.linear_model import OLS
from statsmodels.tsa.stattools import adfuller, coint
from statsmodels.tools import add_constant

def engle_granger_test(y1: np.ndarray, y2: np.ndarray,
                        significance: float = 0.05) -> dict:
    """
    Engle-Granger cointegration test for pair (y1, y2).
    Uses log prices. Returns hedge ratio and cointegration strength.
    """
    log_y1 = np.log(y1)
    log_y2 = np.log(y2)

    # Step 1: OLS regression log_y1 = alpha + beta * log_y2
    X = add_constant(log_y2)
    reg = OLS(log_y1, X).fit()
    alpha = reg.params[0]
    beta = reg.params[1]
    residuals = reg.resid

    # Step 2: ADF test on residuals
    adf_result = adfuller(residuals, autolag='AIC', regression='c')
    p_value = adf_result[1]
    is_coint = p_value < significance

    # Also use statsmodels built-in coint for cross-check
    t_stat, p_val_coint, crit_vals = coint(log_y1, log_y2)

    return {
        'cointegrated': is_coint,
        'ols_alpha': alpha,
        'ols_beta': beta,           # hedge ratio (units of y2 per unit of y1)
        'residual_adf_pval': p_value,
        'coint_pval': p_val_coint,
        'residual_std': np.std(residuals),
        'residuals': residuals,
        'r_squared': reg.rsquared
    }

# Example:
# result = engle_granger_test(btc_prices, eth_prices)
# if result['cointegrated']:
#     print(f"Pair cointegrated! Beta={result['ols_beta']:.3f}")
#     print(f"Residual std: {result['residual_std']:.4f}")

Method 2: Johansen Trace Test (Multi-Asset)

The Johansen test (1991) is more powerful than Engle-Granger and supports testing cointegration among three or more assets simultaneously (baskets). It estimates the number of cointegrating relationships (rank) in the system.

from statsmodels.tsa.vector_ar.vecm import coint_johansen

def johansen_test(price_matrix: np.ndarray, asset_names: list,
                   det_order: int = 0, k_ar_diff: int = 1) -> dict:
    """
    Johansen cointegration test for a matrix of price series.
    price_matrix: shape (T, N) where T = time steps, N = number of assets
    Returns: number of cointegrating vectors, eigenvectors (portfolio weights)
    """
    log_prices = np.log(price_matrix)
    result = coint_johansen(log_prices, det_order=det_order, k_ar_diff=k_ar_diff)

    # Trace statistic test
    # H0: at most r cointegrating relationships
    # Critical values at 90%, 95%, 99% confidence levels
    trace_stats = result.lr1       # trace statistics
    crit_vals_95 = result.cvt[:, 1]  # 95% critical values

    n_coint = 0
    for i, (ts, cv) in enumerate(zip(trace_stats, crit_vals_95)):
        if ts > cv:
            n_coint = i + 1

    print(f"Johansen Test: {' / '.join(asset_names)}")
    print(f"  Number of cointegrating vectors: {n_coint}")
    for i in range(min(n_coint, result.evec.shape[1])):
        weights = result.evec[:, i]
        wstr = ", ".join(f"{a}={w:.3f}" for a, w in zip(asset_names, weights))
        print(f"  Vector {i+1}: [{wstr}]")

    return {
        'n_coint_vectors': n_coint,
        'eigenvectors': result.evec,      # cointegrating vectors
        'eigenvalues': result.eig,
        'trace_stats': trace_stats,
        'crit_vals_95': crit_vals_95,
        'log_prices': log_prices
    }

# Example: test BTC, ETH, SOL basket
# prices = np.column_stack([btc_prices, eth_prices, sol_prices])
# result = johansen_test(prices, ['BTC', 'ETH', 'SOL'])

Choosing Between the Two Tests

PropertyEngle-GrangerJohansen
Number of assets2 (pairs only)2+ (supports baskets)
Cointegrating vectors1 (OLS-estimated)Multiple (full rank test)
Statistical powerLower (single equation)Higher (system of equations)
Hedge ratio estimationDirectly from OLS betaFrom eigenvectors
Sensitivity to directionYes (y1 vs y2 asymmetric)No (symmetric)
ComplexitySimpleModerate
When to useQuick screen for simple pairsFinal test and basket construction

4. Z-Score Entry and Exit Signals

Once a cointegrated pair is identified and the hedge ratio is estimated, the agent computes a z-score that normalizes the current spread deviation into standard deviation units. The z-score is the primary trading signal.

Z-Score Computation

spread_t = log(y1_t) - beta * log(y2_t) - alpha
z_t = (spread_t - mu_spread) / sigma_spread

Where mu_spread and sigma_spread are estimated from a rolling lookback window (typically 60–120 days). Using rolling rather than expanding windows ensures the z-score adapts to regime changes in the relationship.

import numpy as np
from collections import deque

class ZScoreCalculator:
    def __init__(self, lookback: int = 60, beta: float = 1.0, alpha: float = 0.0):
        self.lookback = lookback
        self.beta = beta    # hedge ratio
        self.alpha = alpha  # OLS intercept
        self._spread_history = deque(maxlen=lookback)

    def update(self, y1: float, y2: float) -> dict:
        """Update with new price observation and return current z-score."""
        log_y1 = np.log(y1)
        log_y2 = np.log(y2)
        spread = log_y1 - self.beta * log_y2 - self.alpha
        self._spread_history.append(spread)

        if len(self._spread_history) < 20:
            return {'z_score': None, 'spread': spread, 'n_obs': len(self._spread_history)}

        arr = np.array(self._spread_history)
        mu = arr.mean()
        sigma = arr.std()
        z = (spread - mu) / sigma if sigma > 1e-10 else 0.0

        return {
            'z_score': z,
            'spread': spread,
            'spread_mean': mu,
            'spread_std': sigma,
            'n_obs': len(self._spread_history),
            'half_life': self._estimate_half_life(arr)
        }

    def _estimate_half_life(self, spread: np.ndarray) -> float:
        """Estimate mean-reversion half-life via AR(1) regression."""
        if len(spread) < 10:
            return np.nan
        lagged = spread[:-1]
        delta = spread[1:] - spread[:-1]
        # OLS: delta_t = lambda * spread_{t-1} + epsilon
        lam = np.cov(lagged, delta)[0, 1] / np.var(lagged)
        half_life = -np.log(2) / lam if lam < 0 else np.nan
        return half_life

Entry and Exit Rules

Signal TypeConditionActionRationale
Long spread entryz < -2.0Buy y1, sell beta units of y2Spread too low; expect reversion upward
Short spread entryz > +2.0Sell y1, buy beta units of y2Spread too high; expect reversion downward
Close long spreadz > -0.5Exit long y1 / short y2Spread reverted to near mean
Close short spreadz < +0.5Exit short y1 / long y2Spread reverted to near mean
Stop-loss longz < -4.0Emergency exit; relationship may be breaking downSpread diverging; cointegration failure risk
Stop-loss shortz > +4.0Emergency exitSpread diverging; cointegration failure risk

Half-Life Matters: The mean-reversion half-life estimates how long it takes the spread to move halfway back to zero. If the half-life is >30 days, consider using longer lookback windows and wider z-score thresholds. Half-lives <3 days suggest higher-frequency signals are viable.

Adaptive Z-Score Thresholds

Static thresholds (always enter at z=2.0) are suboptimal. Adapt thresholds based on the current vol regime:

def adaptive_thresholds(spread_vol: float, base_vol: float = 0.02,
                         base_entry: float = 2.0, base_exit: float = 0.5) -> dict:
    """
    Scale z-score thresholds by the ratio of current vol to base vol.
    Higher vol = wider thresholds (avoid false signals during choppy markets).
    """
    vol_ratio = spread_vol / max(base_vol, 1e-6)
    entry = base_entry * min(max(vol_ratio, 0.5), 2.0)   # clamp between 1.0x and 2.0x
    exit_thr = base_exit * min(max(vol_ratio, 0.5), 2.0)
    return {'entry': entry, 'exit': exit_thr, 'stop': entry * 2}

5. Pairs Trading Mechanics

Executing a pairs trade requires simultaneous entry into both legs to avoid leg risk (the risk that you execute one leg but can't execute the other due to slippage or market movement). On Purple Flea Trading, the API supports atomic spread orders for simultaneous execution.

Position Sizing for Pairs

The goal is dollar-neutral sizing: the notional dollar value of the long leg equals the notional dollar value of the short leg. This ensures P&L is driven purely by the spread movement:

def compute_leg_sizes(notional_usd: float, y1_price: float, y2_price: float,
                       beta: float) -> dict:
    """
    Compute sizes for both legs to achieve dollar-neutral exposure.
    beta: hedge ratio (units of y2 per unit of y1)

    For long spread (buy y1, sell y2):
    - Leg 1 notional = notional_usd
    - Leg 2 notional = notional_usd (dollar-neutral, not unit-neutral)

    Unit neutral would require: size_y2 = beta * size_y1
    But we want dollar neutral: size_y1 * y1_price = size_y2 * y2_price
    """
    # Dollar-neutral sizing
    size_y1 = notional_usd / y1_price          # units of y1 to buy
    # Hedge: need beta units of y2 per unit of y1 (log-scale), adjusted for prices
    size_y2 = (beta * size_y1 * y1_price) / y2_price  # units of y2 to sell

    actual_notional_y1 = size_y1 * y1_price
    actual_notional_y2 = size_y2 * y2_price

    return {
        'y1_size': size_y1,
        'y2_size': size_y2,
        'y1_notional': actual_notional_y1,
        'y2_notional': actual_notional_y2,
        'net_dollar_exposure': actual_notional_y1 - actual_notional_y2,
        'hedge_ratio': beta
    }

BTC/ETH: The Canonical Crypto Pair

The BTC/ETH pair is the most-traded stat arb pair in crypto. Both are Proof-of-Work-derived Layer-1 blockchains (though ETH has moved to PoS) with overlapping user bases and fundamentally linked value. Historical data shows they have been cointegrated for extended periods with a hedge ratio of approximately 15–20 ETH per BTC.

PeriodCointegrated?Approx BetaHalf-life (days)Notes
2021 Bull RunYes (weak)~18x8–15 daysETH outperformed BTC; beta drifted
2022 Bear MarketYes (strong)~15x5–10 daysHighly correlated crash; strong cointegration
2023 RecoveryYes~16x6–12 daysStable period; good pairs trading conditions
2024 ETF EraMixedVariable10–20 daysBTC ETF inflows broke short-term cointegration
2025–2026Yes (re-established)~17–19x7–14 daysEquilibrium restored after ETF demand shock

Regime Awareness: Cointegration is not permanent. The BTC/ETH relationship breaks down during major structural events (ETF launches, ETH merge, regulatory shocks). Always re-test cointegration on a rolling 90-day window before trading. If the p-value rises above 0.10, halt the strategy.

6. Basket Trading: Multi-Asset Statistical Arbitrage

While pairs trading uses two assets, basket trading extends the concept to three or more. A basket is a linear combination of assets with weights chosen so that the portfolio value mean-reverts. Basket trading offers more stable signals (less noise) and more opportunities (the universe is larger), but requires more sophisticated execution.

Basket Construction via Johansen Eigenvectors

The first eigenvector from the Johansen test gives the most strongly cointegrating linear combination. For a BTC/ETH/SOL basket:

def construct_basket(johansen_result: dict, asset_names: list,
                      vector_idx: int = 0) -> dict:
    """
    Extract portfolio weights from Johansen eigenvector.
    vector_idx=0 gives the most strongly cointegrating combination.
    """
    evec = johansen_result['eigenvectors'][:, vector_idx]

    # Normalize to sum of absolute weights = 1 for dollar-neutral basket
    norm_weights = evec / np.sum(np.abs(evec))

    basket = {}
    for asset, weight in zip(asset_names, norm_weights):
        basket[asset] = {
            'weight': float(weight),
            'side': 'long' if weight > 0 else 'short',
            'notional_pct': abs(float(weight))
        }
    return basket

# Example output for BTC/ETH/SOL:
# {'BTC': {'weight': 0.52, 'side': 'long', 'notional_pct': 0.52},
#  'ETH': {'weight': -0.31, 'side': 'short', 'notional_pct': 0.31},
#  'SOL': {'weight': -0.17, 'side': 'short', 'notional_pct': 0.17}}

Basket Spread Computation

def basket_spread(log_prices: dict, basket_weights: dict) -> float:
    """Compute basket portfolio value (spread) from log prices."""
    return sum(basket_weights[asset]['weight'] * np.log(price)
               for asset, price in log_prices.items()
               if asset in basket_weights)

Basket Trading vs Pairs Trading

PropertyPairs TradingBasket Trading
Assets per trade23+
Execution complexityLowHigh (must leg into N positions)
Signal qualityNoisier (2 assets)Cleaner (more orthogonal)
OpportunitiesLimited by pair countMuch larger universe
Transaction costs2 legsN legs (higher costs)
Margin requirementLowHigher (multiple positions)
Cointegration stabilityCan break easilyMore robust (over-identified)

7. Mean Reversion in Crypto Markets

Crypto markets exhibit strong mean-reversion dynamics at multiple timescales, driven by structural factors unique to digital assets: perpetual funding rates, liquidation cascades, DEX/CEX price divergences, and cross-exchange arbitrage flows.

Structural Mean-Reversion Drivers in Crypto

💸

Funding Rate Reversion

Perpetual futures funding rates revert toward zero. When funding is extremely positive (longs paying shorts), the basis compresses as arbitrageurs short perp / long spot until funding normalizes.

Liquidation Cascade Recovery

After a cascade of long liquidations drives price below fair value, mean-reverting buy pressure emerges from well-capitalized agents who recognize the temporary dislocation.

🏦

CEX/DEX Spread Arb

Systematic price discrepancies between centralized and decentralized exchanges are closed by arbitrage bots within minutes. The spread is stationary with near-zero mean.

🔗

Cross-Exchange Basis

BTC-PERP prices on different exchanges (Binance vs OKX vs Purple Flea) converge through arbitrage. Inter-exchange basis trades are high-frequency, low-risk stat arb plays.

Measuring Mean Reversion Strength: Hurst Exponent

The Hurst exponent (H) quantifies the degree of mean reversion vs trend in a time series:

H ValueProcess TypeTrading Implication
H < 0.5Anti-persistent (mean-reverting)Stat arb strategies profit; trend-following loses
H = 0.5Random walk (Brownian motion)No edge for either approach
H > 0.5Persistent (trending)Trend-following profits; stat arb loses
def hurst_exponent(series: np.ndarray, min_lag: int = 2, max_lag: int = 100) -> float:
    """
    Compute the Hurst exponent using R/S analysis.
    H < 0.5: mean-reverting; H = 0.5: random walk; H > 0.5: trending.
    """
    lags = range(min_lag, min(max_lag, len(series) // 4))
    tau = []
    rs_vals = []
    for lag in lags:
        segments = len(series) // lag
        rs_list = []
        for i in range(segments):
            seg = series[i*lag:(i+1)*lag]
            mean = seg.mean()
            dev = (seg - mean).cumsum()
            R = dev.max() - dev.min()
            S = seg.std()
            if S > 0:
                rs_list.append(R / S)
        if rs_list:
            rs_vals.append(np.mean(rs_list))
            tau.append(lag)
    if len(tau) < 2:
        return 0.5
    return float(np.polyfit(np.log(tau), np.log(rs_vals), 1)[0])

8. Python StatArbAgent: Complete Implementation

The following complete agent implementation scans candidate pairs on Purple Flea Trading, tests for cointegration, computes rolling z-scores, and autonomously executes pairs trades when signals exceed thresholds.

stat_arb_agent.py
"""
StatArbAgent - Statistical Arbitrage Pairs Trader
Scans Purple Flea Trading (275+ markets) for cointegrated pairs,
computes z-score signals, and executes mean-reversion trades.
"""

import asyncio
import aiohttp
import numpy as np
from statsmodels.tsa.stattools import adfuller, coint
from statsmodels.regression.linear_model import OLS
from statsmodels.tools import add_constant
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timedelta
from collections import deque
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
log = logging.getLogger("StatArbAgent")

# ─── Data Classes ─────────────────────────────────────────────────────────────

@dataclass
class Pair:
    symbol1: str
    symbol2: str
    beta: float               # hedge ratio: y1 = alpha + beta*y2 + residual
    alpha: float              # OLS intercept
    coint_pval: float         # cointegration p-value (lower = stronger)
    half_life: float          # spread mean-reversion half-life in bars
    spread_std: float         # rolling spread standard deviation
    last_tested: datetime = field(default_factory=datetime.utcnow)
    hurst: float = 0.5

    @property
    def is_valid(self) -> bool:
        """Pair is still worth trading."""
        return (self.coint_pval < 0.05 and
                self.half_life < 30 and
                self.hurst < 0.5)

@dataclass
class SpreadState:
    pair: Pair
    z_score: float
    spread: float
    spread_mean: float
    spread_std: float
    timestamp: datetime

@dataclass
class PairPosition:
    pair: Pair
    direction: str            # 'long_spread' or 'short_spread'
    entry_z: float
    entry_spread: float
    y1_size: float            # units of y1
    y2_size: float            # units of y2
    notional_usd: float
    opened_at: datetime = field(default_factory=datetime.utcnow)
    y1_order_id: str = ""
    y2_order_id: str = ""

# ─── Cointegration Scanner ────────────────────────────────────────────────────

class CointegrationScanner:
    def __init__(self, coint_threshold: float = 0.05,
                 max_half_life_bars: float = 30,
                 min_hurst_threshold: float = 0.50):
        self.coint_threshold = coint_threshold
        self.max_half_life = max_half_life_bars
        self.min_hurst = min_hurst_threshold

    def test_pair(self, y1: np.ndarray, y2: np.ndarray,
                   sym1: str, sym2: str) -> Optional[Pair]:
        """Run full cointegration test suite on a price pair."""
        if len(y1) < 60 or len(y2) < 60:
            return None

        log_y1, log_y2 = np.log(y1), np.log(y2)

        # Step 1: Both series must be non-stationary individually
        adf1 = adfuller(log_y1, autolag='AIC')[1]
        adf2 = adfuller(log_y2, autolag='AIC')[1]
        if adf1 < 0.10 or adf2 < 0.10:
            log.debug(f"Skipping {sym1}/{sym2}: one or both series appear stationary")
            return None

        # Step 2: Cointegration test
        _, p_val, _ = coint(log_y1, log_y2)
        if p_val >= self.coint_threshold:
            log.debug(f"{sym1}/{sym2}: not cointegrated (p={p_val:.3f})")
            return None

        # Step 3: OLS to get hedge ratio
        X = add_constant(log_y2)
        reg = OLS(log_y1, X).fit()
        alpha = float(reg.params[0])
        beta = float(reg.params[1])
        residuals = reg.resid

        # Step 4: Half-life estimation
        lagged = residuals[:-1]
        delta = residuals[1:] - residuals[:-1]
        lam = np.cov(lagged, delta)[0, 1] / max(np.var(lagged), 1e-10)
        half_life = -np.log(2) / lam if lam < 0 else 999.0
        if half_life > self.max_half_life or half_life <= 0:
            log.debug(f"{sym1}/{sym2}: half-life too long ({half_life:.1f})")
            return None

        # Step 5: Hurst exponent check
        hurst = self._hurst(residuals)
        if hurst >= self.min_hurst:
            log.debug(f"{sym1}/{sym2}: spread not mean-reverting (H={hurst:.3f})")
            return None

        pair = Pair(
            symbol1=sym1, symbol2=sym2,
            beta=beta, alpha=alpha,
            coint_pval=p_val,
            half_life=half_life,
            spread_std=float(np.std(residuals)),
            hurst=hurst
        )
        log.info(f"Pair found: {sym1}/{sym2} | beta={beta:.3f} | "
                 f"p={p_val:.3f} | HL={half_life:.1f} | H={hurst:.3f}")
        return pair

    def _hurst(self, series: np.ndarray) -> float:
        lags = range(2, min(50, len(series) // 4))
        tau, rs_vals = [], []
        for lag in lags:
            segs = len(series) // lag
            rs_list = []
            for i in range(segs):
                seg = series[i*lag:(i+1)*lag]
                S = seg.std()
                if S > 0:
                    R = (seg - seg.mean()).cumsum()
                    rs_list.append((R.max() - R.min()) / S)
            if rs_list:
                rs_vals.append(np.mean(rs_list))
                tau.append(lag)
        if len(tau) < 2:
            return 0.5
        return float(np.polyfit(np.log(tau), np.log(rs_vals), 1)[0])

# ─── Live Z-Score Tracker ─────────────────────────────────────────────────────

class LiveZScoreTracker:
    def __init__(self, pair: Pair, lookback: int = 60):
        self.pair = pair
        self.lookback = lookback
        self._spread_hist = deque(maxlen=lookback)

    def update(self, y1_price: float, y2_price: float) -> Optional[SpreadState]:
        spread = (np.log(y1_price) - self.pair.beta * np.log(y2_price) - self.pair.alpha)
        self._spread_hist.append(spread)

        if len(self._spread_hist) < 20:
            return None

        arr = np.array(self._spread_hist)
        mu, sigma = arr.mean(), arr.std()
        z = (spread - mu) / sigma if sigma > 1e-10 else 0.0

        return SpreadState(
            pair=self.pair, z_score=z, spread=spread,
            spread_mean=mu, spread_std=sigma, timestamp=datetime.utcnow()
        )

# ─── Signal Generator ─────────────────────────────────────────────────────────

@dataclass
class TradeSignal:
    pair: Pair
    action: str          # 'open_long', 'open_short', 'close', 'stop_loss'
    z_score: float
    confidence: float

class SignalGenerator:
    def __init__(self, entry_z: float = 2.0, exit_z: float = 0.5,
                 stop_z: float = 4.0):
        self.entry_z = entry_z
        self.exit_z = exit_z
        self.stop_z = stop_z

    def generate(self, state: SpreadState,
                  current_position: Optional[PairPosition]) -> Optional[TradeSignal]:
        z = state.z_score
        pair = state.pair

        # Position already open: check for exit or stop
        if current_position is not None:
            if current_position.direction == 'long_spread':
                if abs(z) > self.stop_z:
                    return TradeSignal(pair, 'stop_loss', z, 1.0)
                if z > -self.exit_z:
                    return TradeSignal(pair, 'close', z, 0.9)
            elif current_position.direction == 'short_spread':
                if abs(z) > self.stop_z:
                    return TradeSignal(pair, 'stop_loss', z, 1.0)
                if z < self.exit_z:
                    return TradeSignal(pair, 'close', z, 0.9)
            return None  # hold

        # No position: check for entry
        if z < -self.entry_z:
            conf = min(1.0, abs(z) / self.entry_z * 0.7)
            return TradeSignal(pair, 'open_long', z, conf)
        elif z > self.entry_z:
            conf = min(1.0, abs(z) / self.entry_z * 0.7)
            return TradeSignal(pair, 'open_short', z, conf)

        return None

# ─── Execution Engine ─────────────────────────────────────────────────────────

class PairsExecutionEngine:
    def __init__(self, api_key: str, base_url: str, notional_per_trade: float = 500.0):
        self.api_key = api_key
        self.base_url = base_url
        self.notional = notional_per_trade

    async def open_pair(self, session: aiohttp.ClientSession, signal: TradeSignal,
                         y1_price: float, y2_price: float) -> Optional[PairPosition]:
        """Open a pairs trade: two simultaneous opposing orders."""
        pair = signal.pair
        beta = pair.beta

        size_y1 = self.notional / y1_price
        size_y2 = (beta * size_y1 * y1_price) / y2_price

        if signal.action == 'open_long':
            # Long spread: buy y1, sell y2
            side_y1, side_y2 = 'buy', 'sell'
            direction = 'long_spread'
        else:
            # Short spread: sell y1, buy y2
            side_y1, side_y2 = 'sell', 'buy'
            direction = 'short_spread'

        headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}

        # Execute both legs simultaneously
        results = await asyncio.gather(
            self._place_order(session, headers, pair.symbol1, side_y1, size_y1),
            self._place_order(session, headers, pair.symbol2, side_y2, size_y2),
            return_exceptions=True
        )

        if any(isinstance(r, Exception) for r in results):
            log.error(f"Failed to open pair {pair.symbol1}/{pair.symbol2}: {results}")
            # Try to cancel any successful leg (flat out)
            return None

        entry_spread = (np.log(y1_price) - pair.beta * np.log(y2_price) - pair.alpha)
        pos = PairPosition(
            pair=pair, direction=direction,
            entry_z=signal.z_score, entry_spread=entry_spread,
            y1_size=size_y1, y2_size=size_y2, notional_usd=self.notional,
            y1_order_id=results[0].get('order_id', ''),
            y2_order_id=results[1].get('order_id', '')
        )
        log.info(f"Opened {direction}: {pair.symbol1}/{pair.symbol2} | z={signal.z_score:.2f}")
        return pos

    async def close_pair(self, session: aiohttp.ClientSession,
                          pos: PairPosition, reason: str = "signal"):
        """Close both legs of a pairs trade."""
        pair = pos.pair
        headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}

        if pos.direction == 'long_spread':
            close_y1_side, close_y2_side = 'sell', 'buy'
        else:
            close_y1_side, close_y2_side = 'buy', 'sell'

        await asyncio.gather(
            self._place_order(session, headers, pair.symbol1, close_y1_side,
                              pos.y1_size, reduce_only=True),
            self._place_order(session, headers, pair.symbol2, close_y2_side,
                              pos.y2_size, reduce_only=True),
        )
        log.info(f"Closed {pos.direction}: {pair.symbol1}/{pair.symbol2} | reason={reason}")

    async def _place_order(self, session, headers, symbol, side, size, reduce_only=False):
        order = {"symbol": symbol, "side": side, "size": size,
                 "order_type": "market", "reduce_only": reduce_only,
                 "meta": {"agent": "StatArbAgent/1.0"}}
        async with session.post(f"{self.base_url}/api/orders", json=order,
                                headers=headers) as r:
            return await r.json()

# ─── Main Agent ──────────────────────────────────────────────────────────────

class StatArbAgent:
    CANDIDATE_PAIRS = [
        ("BTC-USD", "ETH-USD"),
        ("ETH-USD", "SOL-USD"),
        ("BTC-USD", "BNB-USD"),
        ("ETH-USD", "BNB-USD"),
        ("SOL-USD", "AVAX-USD"),
        ("BTC-USD", "SOL-USD"),
    ]

    def __init__(self, api_key: str, base_url: str = "https://trading.purpleflea.com",
                 notional_per_trade: float = 500.0):
        self.api_key = api_key
        self.base_url = base_url
        self.scanner = CointegrationScanner()
        self.signal_gen = SignalGenerator()
        self.executor = PairsExecutionEngine(api_key, base_url, notional_per_trade)
        self.active_pairs: Dict[str, Pair] = {}
        self.trackers: Dict[str, LiveZScoreTracker] = {}
        self.positions: Dict[str, PairPosition] = {}

    async def fetch_prices(self, session: aiohttp.ClientSession,
                            symbol: str, n: int = 120) -> np.ndarray:
        headers = {"Authorization": f"Bearer {self.api_key}"}
        async with session.get(f"{self.base_url}/api/ohlcv/{symbol}?interval=4h&limit={n}",
                               headers=headers) as r:
            data = await r.json()
        return np.array([float(c['close']) for c in data['candles']])

    async def scan_pairs(self, session: aiohttp.ClientSession):
        """Re-test all candidate pairs for cointegration."""
        log.info("Scanning pairs for cointegration...")
        for sym1, sym2 in self.CANDIDATE_PAIRS:
            pair_key = f"{sym1}_{sym2}"
            try:
                prices1, prices2 = await asyncio.gather(
                    self.fetch_prices(session, sym1),
                    self.fetch_prices(session, sym2)
                )
                pair = self.scanner.test_pair(prices1, prices2, sym1, sym2)
                if pair and pair.is_valid:
                    self.active_pairs[pair_key] = pair
                    self.trackers[pair_key] = LiveZScoreTracker(pair)
                    log.info(f"Added/updated pair: {pair_key}")
                elif pair_key in self.active_pairs:
                    # Pair failed re-test: close any open position and remove
                    if pair_key in self.positions:
                        await self.executor.close_pair(session, self.positions[pair_key],
                                                       reason="cointegration_failed")
                        del self.positions[pair_key]
                    del self.active_pairs[pair_key]
                    log.warning(f"Removed pair (cointegration failed): {pair_key}")
            except Exception as e:
                log.error(f"Error scanning {sym1}/{sym2}: {e}")

    async def trade_cycle(self, session: aiohttp.ClientSession):
        """One trading cycle: update prices, compute z-scores, execute signals."""
        for pair_key, pair in list(self.active_pairs.items()):
            sym1, sym2 = pair.symbol1, pair.symbol2
            try:
                prices1, prices2 = await asyncio.gather(
                    self.fetch_prices(session, sym1, n=1),
                    self.fetch_prices(session, sym2, n=1)
                )
                y1_px, y2_px = prices1[-1], prices2[-1]
                tracker = self.trackers[pair_key]
                state = tracker.update(y1_px, y2_px)
                if state is None:
                    continue

                current_pos = self.positions.get(pair_key)
                signal = self.signal_gen.generate(state, current_pos)

                if signal is None:
                    continue

                if signal.action in ('open_long', 'open_short'):
                    pos = await self.executor.open_pair(session, signal, y1_px, y2_px)
                    if pos:
                        self.positions[pair_key] = pos
                elif signal.action in ('close', 'stop_loss') and current_pos:
                    await self.executor.close_pair(session, current_pos, reason=signal.action)
                    del self.positions[pair_key]

            except Exception as e:
                log.error(f"Trade cycle error {pair_key}: {e}")

    async def run(self, scan_interval_minutes: int = 240,
                  trade_interval_seconds: int = 60):
        """Main agent loop: periodic pair scanning + frequent trading."""
        log.info("StatArbAgent starting...")
        last_scan = datetime.min
        async with aiohttp.ClientSession() as session:
            while True:
                now = datetime.utcnow()
                if (now - last_scan).total_seconds() >= scan_interval_minutes * 60:
                    await self.scan_pairs(session)
                    last_scan = now
                if self.active_pairs:
                    await self.trade_cycle(session)
                else:
                    log.info("No active pairs; waiting for scan...")
                await asyncio.sleep(trade_interval_seconds)

if __name__ == "__main__":
    import os
    agent = StatArbAgent(api_key=os.environ["PURPLE_FLEA_API_KEY"], notional_per_trade=500.0)
    asyncio.run(agent.run(scan_interval_minutes=240, trade_interval_seconds=60))

9. Backtesting and Expected Performance

Before deploying a stat arb strategy live, historical backtesting is essential. The following summarizes realistic performance expectations and key backtest parameters:

Backtest Parameter Selection

ParameterRecommended ValueSensitivity
Lookback window (cointegration)90–120 4h bars (15–20 days)High: too short = noisy; too long = misses regime changes
Z-score lookback (rolling mean/std)60 bars (10 days)Medium
Entry threshold1.8–2.2High: lower = more trades, more risk
Exit threshold0.3–0.7Medium
Stop-loss threshold3.5–4.5High: determines max loss per trade
Max holding period3× half-lifeHigh: prevents being stuck in dead pairs
Re-cointegration test frequencyEvery 5–7 daysMedium

Realistic Performance Benchmarks (BTC/ETH, 2022–2025)

MetricConservativeBase CaseOptimistic
Annual return8–12%15–25%30–45%
Sharpe ratio0.7–1.01.0–1.51.5–2.0
Win rate55–62%62–70%70–78%
Max drawdown12–18%8–12%5–8%
Avg trades/month4–68–1415–25
Avg holding period3–8 days1–4 days6–48 hours

Key Edge: Stat arb on BTC/ETH performs best during high-volatility, range-bound markets — exactly when directional strategies struggle most. Adding a stat arb agent to a directional trading book provides meaningful diversification benefit.

Common Failure Modes

10. Getting Started on Purple Flea

Purple Flea Trading provides 275+ perpetual markets — giving stat arb agents a large universe of potential pairs and baskets. The multi-chain Agent Wallet handles cross-exchange settlement, and the Faucet gives new agents $1 USDC free to begin testing without capital risk.

🧪

1. Claim Free Test Capital

Visit /faucet to register your agent and claim $1 USDC. Use it to test pair execution logic before deploying real capital.

📊

2. Scan for Pairs

Use the CointegrationScanner above against Purple Flea's price history API. Start with the BTC/ETH canonical pair, then expand.

🤖

3. Deploy StatArbAgent

Run the agent with notional_per_trade=100 initially. Monitor z-scores and spread half-lives daily. Scale up as the strategy proves out.

Agent-to-Agent Settlement: When two stat arb agents hold opposing legs of the same pair on different platforms, Purple Flea's Escrow service enables direct P&L settlement at 1% fee — eliminating exchange spread costs entirely.

Deploy Your Stat Arb Agent

Purple Flea gives AI agents 275+ markets to find cointegrated pairs, multi-chain wallet infrastructure, and a free $1 USDC faucet to start testing without risk.