← All posts

Statistical Pairs Trading for AI Agents


Pairs trading is the original statistical arbitrage strategy, born on the trading desks of Morgan Stanley in the 1980s. The core idea is elegant: identify two assets whose prices tend to move together (are cointegrated), monitor the spread between them, and trade when that spread deviates far enough from its historical mean. The market-neutral structure — simultaneously long one asset, short the other — means the strategy profits from the relative price move rather than the absolute direction of either asset.

For AI agents, pairs trading is particularly attractive. The signal generation is algorithmic, the execution is mechanical, and the strategy can run continuously without human oversight. This guide covers the full implementation: finding cointegrated pairs using the Engle-Granger test, generating z-score signals, dynamic hedge ratio estimation via the Kalman filter, and placing simultaneous long/short orders via the Purple Flea Trading API.

2.0
Z-score entry threshold
0.5
Z-score exit threshold
<5%
Engle-Granger p-value cutoff
10+
Minimum pairs in portfolio

1. Pairs Trading Fundamentals

What is Cointegration?

Two price series P_A and P_B are cointegrated if, despite each individually being a random walk (non-stationary), a linear combination of them is stationary. In plain English: both prices drift over time, but their difference (or weighted difference) is mean-reverting.

Cointegration relationship
spread_t = P_A(t) - β * P_B(t) - μ

Where:
β = hedge ratio (how many units of B to hold per unit of A)
μ = long-run mean of the spread
spread_t is stationary (mean-reverting) if A and B are cointegrated

Correlation is not the same as cointegration. Correlation measures how prices move in sync at a given moment. Cointegration measures whether the long-run relationship between two prices is stable. Two assets can be highly correlated but not cointegrated (they drift apart permanently), or cointegrated but not always correlated in the short run.

The Spread and Mean Reversion

Once you've identified a cointegrated pair, the trading signal comes from the spread's deviation from its long-run mean. The z-score normalizes this deviation:

Spread z-score
spread = P_A - β * P_B
z = (spread - mean(spread)) / std(spread)

Entry long spread: z < -2.0 (spread too low, expect recovery)
Entry short spread: z > 2.0 (spread too high, expect decline)
Exit position: |z| < 0.5 (spread returned to mean)

Why AI Agents Excel at This Strategy

  • Always on: Agents never miss an entry signal due to sleep or distraction
  • Simultaneous execution: Long/short legs placed in milliseconds, minimizing leg risk
  • Continuous monitoring: Cointegration can break down — agents check this continuously
  • Portfolio of pairs: Agents can monitor dozens of pairs simultaneously, diversifying specific breakdown risk
  • Parameter adaptation: Kalman filter updates hedge ratio in real time as the relationship evolves

2. Finding Cointegrated Pairs: Engle-Granger Test

The Engle-Granger (1987) two-step procedure is the most commonly used test for pairwise cointegration. It is simple to implement and interpret, making it ideal for automated pair screening.

Step 1: Regress One Series on the Other

For each candidate pair (A, B), run an OLS regression to find the hedge ratio:

OLS regression (step 1)
P_A = β₀ + β₁ * P_B + ε

β₁ = hedge ratio (use as beta in spread calculation)
ε = residuals (the spread series)

Step 2: Test Residuals for Stationarity

Apply the Augmented Dickey-Fuller (ADF) test to the residuals. If the p-value is below 0.05, reject the null of a unit root — the residuals are stationary, meaning the pair is cointegrated.

import numpy as np
from statsmodels.tsa.stattools import coint, adfuller
from statsmodels.regression.linear_model import OLS
from statsmodels.tools.tools import add_constant
import itertools

def engle_granger_test(
    price_a: np.ndarray,
    price_b: np.ndarray
) -> dict:
    """
    Run Engle-Granger cointegration test on two price series.
    Returns dict with hedge_ratio, p_value, adf_stat, is_cointegrated.
    """
    # Step 1: OLS regression to get hedge ratio
    X = add_constant(price_b)
    model = OLS(price_a, X).fit()
    hedge_ratio = model.params[1]
    intercept   = model.params[0]

    # Step 2: ADF test on residuals
    residuals = price_a - hedge_ratio * price_b - intercept
    adf_result = adfuller(residuals, autolag='AIC')
    adf_stat = adf_result[0]
    p_value  = adf_result[1]

    return {
        "hedge_ratio": hedge_ratio,
        "intercept": intercept,
        "adf_stat": adf_stat,
        "p_value": p_value,
        "is_cointegrated": p_value < 0.05,
        "spread_mean": float(np.mean(residuals)),
        "spread_std": float(np.std(residuals))
    }


def find_pairs(
    symbols: list[str],
    price_data: dict[str, np.ndarray],
    max_p_value: float = 0.05
) -> list[dict]:
    """
    Screen all pairs from a universe of symbols.
    Returns list of cointegrated pairs sorted by p-value.
    """
    candidates = []

    for sym_a, sym_b in itertools.combinations(symbols, 2):
        prices_a = price_data[sym_a]
        prices_b = price_data[sym_b]

        # Ensure equal length
        min_len = min(len(prices_a), len(prices_b))
        prices_a = prices_a[-min_len:]
        prices_b = prices_b[-min_len:]

        try:
            result = engle_granger_test(prices_a, prices_b)
            if result["is_cointegrated"]:
                result["sym_a"] = sym_a
                result["sym_b"] = sym_b
                candidates.append(result)
        except Exception as e:
            continue

    # Sort by p-value (strongest cointegration first)
    candidates.sort(key=lambda x: x["p_value"])
    return candidates


# Example: screen crypto universe for pairs
universe = ["BTC-USDC", "ETH-USDC", "SOL-USDC", "BNB-USDC", "AVAX-USDC"]
# price_data: {symbol: np.array of closing prices, last 90 days}

pairs = find_pairs(universe, price_data, max_p_value=0.05)
for p in pairs:
    print(f"{p['sym_a']}/{p['sym_b']}: p={p['p_value']:.4f} "
          f"beta={p['hedge_ratio']:.4f}")
Statsmodels Built-in Test

The statsmodels.tsa.stattools.coint() function runs the full Engle-Granger test in one call. The manual implementation above is shown for educational clarity. In production, use coint(price_a, price_b) for cleaner code.

Pair Selection Criteria

A low p-value alone is not sufficient for trading. Apply these additional filters:

  • Economic rationale: The pair should make intuitive sense (e.g., two layer-1 blockchains, two DeFi tokens, BTC and ETH)
  • Minimum half-life: The spread's mean-reversion half-life should be 1-30 days. Too fast = noise, too slow = capital inefficient
  • Reasonable hedge ratio: Ratios outside 0.1-10x suggest the pair relationship may be spurious
  • Out-of-sample validation: Test found pairs on a held-out data period before trading
def half_life(spread: np.ndarray) -> float:
    """
    Estimate mean-reversion half-life via AR(1) regression.
    half_life = -ln(2) / ln(phi)
    where phi is the AR(1) coefficient.
    """
    lagged = spread[:-1]
    delta  = np.diff(spread)
    X = add_constant(lagged)
    phi = OLS(delta, X).fit().params[1]

    if phi >= 0:
        return float('inf')  # non-mean-reverting
    return -np.log(2) / np.log(1 + phi)

3. Z-Score Signal Generation and Entry/Exit Rules

Computing the Spread

The spread at each time step uses the estimated hedge ratio and intercept from the Engle-Granger regression:

Spread calculation
spread(t) = P_A(t) - β * P_B(t) - μ

z(t) = (spread(t) - rolling_mean(spread, window)) / rolling_std(spread, window)

Entry and Exit Signal Logic

The signal is simple: the further the z-score from zero, the more stretched the pair's relationship. You enter when the spread is stretched enough to expect reversion, and exit when it returns close to the mean.

Z-Score Condition Signal Action Expectation
z < -2.0Long spreadBuy A, Sell BSpread will rise back to mean
z > +2.0Short spreadSell A, Buy BSpread will fall back to mean
|z| < 0.5ExitClose both legsMean reversion complete
|z| > 3.5Stop lossClose both legsPossible breakdown — cut loss
def calculate_spread(
    price_a: float,
    price_b: float,
    hedge_ratio: float,
    intercept: float
) -> float:
    return price_a - hedge_ratio * price_b - intercept


def calculate_zscore(
    spread_history: list[float],
    window: int = 60
) -> float:
    """
    Compute rolling z-score of the spread.
    window: number of data points in the rolling window.
    """
    if len(spread_history) < window:
        return 0.0
    recent = spread_history[-window:]
    mean = sum(recent) / len(recent)
    variance = sum((x - mean)**2 for x in recent) / len(recent)
    std = variance ** 0.5
    if std == 0:
        return 0.0
    return (spread_history[-1] - mean) / std


def generate_signal(
    z: float,
    current_position: int,   # -1 = short spread, 0 = flat, +1 = long spread
    entry_threshold: float = 2.0,
    exit_threshold: float = 0.5,
    stop_threshold: float = 3.5
) -> str:
    """
    Returns: 'long_spread' | 'short_spread' | 'exit' | 'hold' | 'stop'
    """
    # Stop loss
    if abs(z) > stop_threshold:
        return "stop"

    # Exit existing position
    if current_position != 0 and abs(z) < exit_threshold:
        return "exit"

    # New entry signals (only when flat)
    if current_position == 0:
        if z < -entry_threshold:
            return "long_spread"   # buy A, sell B
        if z > entry_threshold:
            return "short_spread"  # sell A, buy B

    return "hold"

4. Python: PairsTradingAgent Class

The following PairsTradingAgent class integrates pair selection, spread monitoring, signal generation, and order execution into a complete, running agent.

import asyncio
import time
from dataclasses import dataclass, field
from typing import Optional
import httpx
import numpy as np

PF_API = "https://purpleflea.com/api/trading"


@dataclass
class PairConfig:
    sym_a: str
    sym_b: str
    hedge_ratio: float         # beta from cointegration regression
    intercept: float           # long-run mean offset
    spread_mean: float
    spread_std: float
    trade_size_a: float        # notional size in asset A units
    entry_z: float = 2.0
    exit_z: float = 0.5
    stop_z: float = 3.5
    lookback: int = 60         # rolling window for z-score


class PairsTradingAgent:
    """
    Live pairs trading agent for Purple Flea.
    Monitors a single pair, generates z-score signals,
    and executes simultaneous long/short orders.
    """

    def __init__(self, config: PairConfig, api_key: str):
        self.cfg = config
        self.api_key = api_key
        self.spread_history: list[float] = []
        self.position: int = 0  # -1, 0, +1
        self.entry_spread: Optional[float] = None
        self.entry_time: Optional[float] = None
        self.pnl_history: list[dict] = []
        self.active_orders: dict = {}
        self.client = httpx.AsyncClient(
            base_url=PF_API,
            headers={"Authorization": f"Bearer {api_key}"},
            timeout=8.0
        )

    def find_pairs(
        self,
        symbols: list[str],
        price_data: dict[str, np.ndarray]
    ) -> list[dict]:
        """
        Screen all symbol combinations for cointegrated pairs.
        Returns sorted list of cointegrated pairs with metadata.
        Delegates to engle_granger_test() for each combination.
        """
        return find_pairs(symbols, price_data)  # uses module-level function

    def calculate_spread(self, price_a: float, price_b: float) -> float:
        """Compute current spread using stored hedge ratio and intercept."""
        return price_a - self.cfg.hedge_ratio * price_b - self.cfg.intercept

    def generate_signal(self, price_a: float, price_b: float) -> str:
        """
        Update spread history and generate trading signal.
        Returns: 'long_spread' | 'short_spread' | 'exit' | 'hold' | 'stop'
        """
        spread = self.calculate_spread(price_a, price_b)
        self.spread_history.append(spread)
        if len(self.spread_history) > 500:
            self.spread_history = self.spread_history[-500:]

        z = calculate_zscore(self.spread_history, window=self.cfg.lookback)
        signal = generate_signal(
            z, self.position,
            self.cfg.entry_z, self.cfg.exit_z, self.cfg.stop_z
        )
        return signal, z, spread

    async def execute_pair(self, signal: str, price_a: float, price_b: float):
        """
        Execute a pairs trade: place simultaneous orders for both legs.
        Long spread = buy A + sell B.
        Short spread = sell A + buy B.
        Exit = close both legs at market.
        """
        size_a = self.cfg.trade_size_a
        size_b = size_a * self.cfg.hedge_ratio / price_b * price_a

        if signal == "long_spread":
            orders = [
                {"symbol": self.cfg.sym_a, "side": "buy",  "type": "market", "size": size_a},
                {"symbol": self.cfg.sym_b, "side": "sell", "type": "market", "size": size_b},
            ]
            self.position = 1
        elif signal == "short_spread":
            orders = [
                {"symbol": self.cfg.sym_a, "side": "sell", "type": "market", "size": size_a},
                {"symbol": self.cfg.sym_b, "side": "buy",  "type": "market", "size": size_b},
            ]
            self.position = -1
        elif signal in ("exit", "stop"):
            await self.close_position(price_a, price_b)
            return
        else:
            return

        # Submit both legs concurrently
        results = await asyncio.gather(*[
            self.client.post("/orders", json=order)
            for order in orders
        ])
        self.entry_spread = self.calculate_spread(price_a, price_b)
        self.entry_time = time.time()
        print(f"[{signal.upper()}] {self.cfg.sym_a}/{self.cfg.sym_b} "
              f"spread={self.entry_spread:.4f}")
        return [r.json() for r in results]

    async def close_position(self, price_a: float, price_b: float):
        """Close both legs of an open pairs trade."""
        if self.position == 0:
            return

        size_a = self.cfg.trade_size_a
        size_b = size_a * self.cfg.hedge_ratio / price_b * price_a

        # Reverse the legs to close
        close_side_a = "sell" if self.position == 1 else "buy"
        close_side_b = "buy"  if self.position == 1 else "sell"

        await asyncio.gather(
            self.client.post("/orders", json={
                "symbol": self.cfg.sym_a, "side": close_side_a,
                "type": "market", "size": size_a
            }),
            self.client.post("/orders", json={
                "symbol": self.cfg.sym_b, "side": close_side_b,
                "type": "market", "size": size_b
            })
        )

        exit_spread = self.calculate_spread(price_a, price_b)
        trade_pnl = self.position * (exit_spread - self.entry_spread)
        self.pnl_history.append({
            "entry": self.entry_spread, "exit": exit_spread,
            "pnl": trade_pnl, "position": self.position,
            "duration": time.time() - (self.entry_time or 0)
        })
        self.position = 0
        self.entry_spread = None
        print(f"[CLOSE] {self.cfg.sym_a}/{self.cfg.sym_b} PnL={trade_pnl:.4f}")

5. Kalman Filter for Dynamic Hedge Ratio Estimation

The Engle-Granger regression gives you a static hedge ratio estimated over a historical window. In practice, the true hedge ratio drifts over time as the structural relationship between the two assets evolves. The Kalman filter provides an optimal, real-time estimate of this changing ratio without requiring full re-estimation at each step.

The Kalman Filter as a Dynamic Regression

Treating the hedge ratio as a state variable that evolves as a random walk, the Kalman filter updates the estimate with each new data point, weighting recent observations more heavily:

Kalman filter state equations
State: β_t = β_{t-1} + w_t (w ~ N(0, Q))
Observation: P_A(t) = β_t * P_B(t) + v_t (v ~ N(0, R))

Predict: β_t|t-1 = β_{t-1|t-1}
P_t|t-1 = P_{t-1|t-1} + Q

Update: K_t = P_t|t-1 * P_B(t) / (P_B(t)² * P_t|t-1 + R)
β_t = β_t|t-1 + K_t * (P_A(t) - β_t|t-1 * P_B(t))
P_t = (1 - K_t * P_B(t)) * P_t|t-1
class KalmanHedgeRatio:
    """
    Online Kalman filter for dynamic hedge ratio estimation.
    Adapts the hedge ratio in real-time as prices arrive.
    Q: state noise — higher = faster adaptation but noisier
    R: observation noise — higher = smoother but slower
    """

    def __init__(self, Q: float = 1e-5, R: float = 1e-2):
        self.Q = Q           # state transition noise
        self.R = R           # observation noise
        self.beta = 1.0      # initial hedge ratio guess
        self.P = 1.0        # error covariance
        self.e_history: list[float] = []

    def update(self, price_a: float, price_b: float) -> tuple[float, float]:
        """
        Process one observation and return (hedge_ratio, forecast_error).
        Call this every time a new price tick arrives.
        """
        # Predict step
        P_pred = self.P + self.Q

        # Observation: what P_A should be given current beta
        forecast = self.beta * price_b
        error = price_a - forecast  # innovation

        # Kalman gain
        S = price_b**2 * P_pred + self.R
        K = P_pred * price_b / S

        # Update step
        self.beta += K * error
        self.P = (1 - K * price_b) * P_pred

        self.e_history.append(error)
        return self.beta, error

    def spread(self, price_a: float, price_b: float) -> float:
        """Current spread using dynamic hedge ratio."""
        return price_a - self.beta * price_b

    def zscore(self, window: int = 60) -> float:
        """Z-score of recent forecast errors (Kalman innovations)."""
        errors = self.e_history[-window:]
        if len(errors) < 10:
            return 0.0
        mean = sum(errors) / len(errors)
        std  = (sum((e - mean)**2 for e in errors) / len(errors)) ** 0.5
        if std == 0:
            return 0.0
        return (errors[-1] - mean) / std


# Example: dynamic pair monitoring with Kalman filter
kf = KalmanHedgeRatio(Q=1e-5, R=1e-2)

for p_a, p_b in zip(prices_btc, prices_eth):
    beta, error = kf.update(p_a, p_b)
    z = kf.zscore(window=60)

    if abs(z) > 2.0:
        print(f"Signal: z={z:.2f} beta={beta:.4f} spread={kf.spread(p_a,p_b):.4f}")
Tuning Q and R

The ratio Q/R controls how quickly the filter adapts. High Q/R = fast adaptation (tracks short-term ratio changes, more noise). Low Q/R = slow adaptation (smoother estimates, slower to react). For daily crypto data, Q=1e-5, R=1e-2 is a reasonable starting point. Run a grid search over historical data to optimize.

6. Risk Management: Spread Blowout and Correlation Breakdown

Spread Blowout Risk

The most dangerous event in pairs trading is a spread blowout — when the spread widens far beyond its historical range and does not revert. This happens when the cointegration relationship permanently breaks down due to a fundamental event: a company-specific shock, regulatory change, or structural market shift. In crypto, this can be triggered by a protocol hack, a major exchange insolvency, or an ecosystem collapse.

Spread Blowout Defense

Always set a hard stop at z=3.5 or beyond. Do not add to losing pairs positions. If a spread reaches 4+ standard deviations and stays there for more than 24 hours, the relationship may be broken permanently. Close the position regardless of unrealized loss.

class PairsRiskManager:
    def __init__(
        self,
        max_holding_hours: float = 72,   # max time in any single trade
        stop_z: float = 3.5,             # hard z-score stop
        max_spread_multiple: float = 4.0, # max spread vs historical std
        max_drawdown_pct: float = 5.0     # max portfolio drawdown %
    ):
        self.max_holding_hours = max_holding_hours
        self.stop_z = stop_z
        self.max_spread_multiple = max_spread_multiple
        self.max_drawdown_pct = max_drawdown_pct
        self.portfolio_pnl: float = 0.0
        self.peak_pnl: float = 0.0
        self.active_positions: dict = {}

    def should_stop(
        self,
        pair_id: str,
        z: float,
        spread: float,
        spread_std: float,
        entry_time: float
    ) -> tuple[bool, str]:
        """
        Returns (should_close, reason) for a given open position.
        """
        # Hard z-score stop
        if abs(z) > self.stop_z:
            return True, f"z-score stop ({z:.2f})"

        # Spread blowout: beyond 4 historical std devs
        if abs(spread) > self.max_spread_multiple * spread_std:
            return True, f"spread blowout ({spread:.4f} vs {spread_std:.4f})"

        # Time stop: position held too long
        hours_held = (time.time() - entry_time) / 3600
        if hours_held > self.max_holding_hours:
            return True, f"time stop ({hours_held:.1f}h)"

        # Portfolio drawdown check
        drawdown = (self.peak_pnl - self.portfolio_pnl) / max(1, abs(self.peak_pnl))
        if drawdown > self.max_drawdown_pct / 100:
            return True, f"portfolio drawdown ({drawdown*100:.1f}%)"

        return False, ""

    def check_cointegration_health(
        self,
        spread_history: list[float],
        original_spread_std: float,
        window: int = 100
    ) -> bool:
        """
        Monitor whether the cointegration relationship is still valid.
        Returns False if spread volatility has expanded significantly.
        """
        if len(spread_history) < window:
            return True
        recent = spread_history[-window:]
        recent_std = np.std(recent)
        # If current spread std is 3x the original, relationship may be broken
        return recent_std < original_spread_std * 3.0

Correlation Breakdown Detection

Beyond hard stops, monitor for gradual degradation of the pair relationship using a rolling correlation metric. If the 30-day rolling correlation between the two assets drops below 0.5, reduce position size by 50% and increase stop tightness:

def rolling_correlation(
    prices_a: list[float],
    prices_b: list[float],
    window: int = 30
) -> float:
    """30-day rolling correlation between two price series."""
    if len(prices_a) < window:
        return 1.0
    a = np.array(prices_a[-window:])
    b = np.array(prices_b[-window:])
    corr_matrix = np.corrcoef(a, b)
    return float(corr_matrix[0, 1])

def position_size_scale(correlation: float) -> float:
    """Scale position size down as correlation deteriorates."""
    if correlation > 0.8: return 1.0
    if correlation > 0.6: return 0.75
    if correlation > 0.4: return 0.5
    return 0.0  # relationship too weak: no trade

7. Portfolio of Pairs for Diversification

A single pair is vulnerable to breakdown. A portfolio of 10-20 cointegrated pairs spreads this idiosyncratic risk — when one pair's relationship breaks, the others continue generating returns. Portfolio construction for pairs trading follows different rules than traditional asset allocation because the positions are already market-neutral.

Pair Correlation Within the Portfolio

Avoid pairs whose spread movements are highly correlated with each other — this defeats the diversification goal. For example, BTC/ETH and BTC/SOL spreads will both react to Bitcoin-specific events, providing less diversification than BTC/ETH and two uncorrelated DeFi token pairs.

Pair Category Example Pairs Correlation to BTC/ETH Diversification Value
Layer-1 pairsBTC/ETH, ETH/SOLHigh (~0.7)Low
DeFi pairsUNI/AAVE, CRV/BALMedium (~0.4)Medium
Cross-sectorBTC/LINK, ETH/DOTLow (~0.2)High
Stablecoin pairsUSDC/USDT (rate arb)NoneHighest

Equal Risk Allocation

Size each pair's position such that the expected loss on a 1-standard-deviation spread move is equal across all pairs. This prevents a single volatile pair from dominating portfolio risk:

def compute_position_sizes(
    pairs: list[dict],      # [{sym_a, sym_b, spread_std, ...}, ...]
    total_capital: float,
    risk_per_pair: float = 0.01  # 1% of capital at risk per pair per 1-std move
) -> list[float]:
    """
    Equal-risk position sizing across a portfolio of pairs.
    Returns list of notional sizes for the A leg of each pair.
    """
    sizes = []
    for pair in pairs:
        # Max loss we accept per 1-std spread move
        dollar_risk = total_capital * risk_per_pair
        # size = dollar_risk / spread_std
        # (spread_std is in price units of asset A)
        spread_std = pair["spread_std"]
        size = dollar_risk / spread_std if spread_std > 0 else 0
        sizes.append(size)
    return sizes


class PairsPortfolio:
    """Manager for a portfolio of pairs trading agents."""

    def __init__(self, api_key: str, total_capital: float):
        self.api_key = api_key
        self.capital = total_capital
        self.agents: list[PairsTradingAgent] = []
        self.risk = PairsRiskManager()

    def add_pair(self, pair_config: dict):
        cfg = PairConfig(
            sym_a=pair_config["sym_a"],
            sym_b=pair_config["sym_b"],
            hedge_ratio=pair_config["hedge_ratio"],
            intercept=pair_config["intercept"],
            spread_mean=pair_config["spread_mean"],
            spread_std=pair_config["spread_std"],
            trade_size_a=pair_config["trade_size_a"]
        )
        agent = PairsTradingAgent(cfg, self.api_key)
        self.agents.append(agent)

    async def tick(self, prices: dict[str, float]):
        """Process one price update across all pairs in the portfolio."""
        for agent in self.agents:
            p_a = prices.get(agent.cfg.sym_a)
            p_b = prices.get(agent.cfg.sym_b)
            if p_a is None or p_b is None:
                continue
            signal, z, spread = agent.generate_signal(p_a, p_b)
            if signal != "hold":
                await agent.execute_pair(signal, p_a, p_b)

    def portfolio_pnl(self) -> float:
        return sum(
            sum(t["pnl"] for t in a.pnl_history)
            for a in self.agents
        )

8. Integration with Purple Flea Trading API

Purple Flea's API supports simultaneous order placement across multiple symbols, which is essential for pairs trading — the faster you can close the time gap between the two legs, the less leg risk you carry. Here is the complete integration using WebSocket price feeds and REST order placement:

import asyncio
import json
import websockets
import httpx

PF_WS  = "wss://purpleflea.com/ws/trading"
PF_API = "https://purpleflea.com/api/trading"


async def run_pairs_portfolio(api_key: str, pairs_config: list[dict]):
    """
    Main entry point for a live pairs trading portfolio.
    Streams prices via WebSocket, manages signals, places orders.
    """
    portfolio = PairsPortfolio(api_key=api_key, total_capital=10000.0)
    for cfg in pairs_config:
        portfolio.add_pair(cfg)

    prices: dict[str, float] = {}

    async with websockets.connect(
        f"{PF_WS}?token={api_key}"
    ) as ws:
        # Subscribe to all relevant symbols
        all_symbols = list({
            sym
            for cfg in pairs_config
            for sym in [cfg["sym_a"], cfg["sym_b"]]
        })
        for sym in all_symbols:
            await ws.send(json.dumps({
                "action": "subscribe",
                "channel": "ticker",
                "symbol": sym
            }))

        async for msg in ws:
            data = json.loads(msg)
            if data.get("channel") == "ticker":
                prices[data["symbol"]] = data["mid"]

                # Process portfolio on each price update
                await portfolio.tick(prices)

                pnl = portfolio.portfolio_pnl()
                print(f"Prices updated. Portfolio PnL: {pnl:.4f} USDC")


# Startup configuration
PAIRS = [
    {
        "sym_a": "BTC-USDC", "sym_b": "ETH-USDC",
        "hedge_ratio": 17.5, "intercept": -200.0,
        "spread_mean": 0.0, "spread_std": 150.0,
        "trade_size_a": 0.01
    },
    {
        "sym_a": "ETH-USDC", "sym_b": "SOL-USDC",
        "hedge_ratio": 11.2, "intercept": 50.0,
        "spread_mean": 0.0, "spread_std": 45.0,
        "trade_size_a": 0.1
    },
]

if __name__ == "__main__":
    asyncio.run(run_pairs_portfolio(
        api_key="pf_live_YOUR_KEY_HERE",
        pairs_config=PAIRS
    ))

Simultaneous Order Guarantees

Purple Flea supports batch order submission via the /orders/batch endpoint, which accepts multiple orders in a single API call and executes them as close to simultaneously as possible. Use this for pairs trading to minimize the time gap between legs:

async def place_pair_batch(
    client: httpx.AsyncClient,
    sym_a: str, side_a: str, size_a: float,
    sym_b: str, side_b: str, size_b: float
) -> dict:
    """
    Submit both pair legs in a single batch request.
    Minimizes execution gap between legs.
    """
    resp = await client.post("/orders/batch", json={
        "orders": [
            {"symbol": sym_a, "side": side_a, "type": "market", "size": size_a},
            {"symbol": sym_b, "side": side_b, "type": "market", "size": size_b},
        ],
        "atomic": False  # True = reject both if either fails
    })
    return resp.json()
Agent Registration & Capital

Before running pairs trades, register your agent at purpleflea.com/register and claim free starting capital via the Agent Faucet. The faucet provides enough capital to run a small pairs portfolio with 1% risk per pair.

Performance Considerations

Expected Returns and Sharpe

Well-implemented pairs trading in crypto typically achieves:

Metric Typical Range Notes
Annual return15-40%Depends on pair selection and volatility regime
Sharpe ratio1.5-3.5Higher during ranging markets, lower during trending
Win rate55-70%Many small wins, occasional large losses on breakdown
Average holding period1-7 daysCalibrate entry z-score to control frequency
Max drawdown5-15%With proper circuit breakers applied

When Pairs Trading Fails

  • Trending markets: When all crypto assets move strongly in one direction, spreads widen on both sides without reverting. Reduce activity during high-VIX crypto periods.
  • Low liquidity: Pairs require sufficient volume on both legs. Avoid pairs with thinly traded tokens where your own orders move the market.
  • Overfitting in pair selection: P-values from in-sample testing are biased. Always validate pairs out-of-sample before live trading.
  • Transaction costs: High taker fees eat spread income. Use limit orders where execution speed allows, or ensure the expected spread income exceeds 2x your total fees.

Summary

Statistical pairs trading is one of the most robust algorithmic strategies available to AI agents. The market-neutral structure insulates returns from broad market moves, while the continuous monitoring capability of agents makes them ideal operators. The key steps are:

  1. Screen candidate pairs with the Engle-Granger test, filtering for p < 0.05 and economically rational relationships
  2. Validate pairs out-of-sample and compute half-life to ensure mean-reversion is tradeable
  3. Use the Kalman filter for dynamic hedge ratio estimation — static ratios degrade over time
  4. Trade z-score signals: enter at ±2.0, exit at ±0.5, stop at ±3.5
  5. Monitor for cointegration breakdown via rolling correlation and spread volatility expansion
  6. Run a portfolio of 10-20 pairs from different correlation clusters
  7. Use batch order submission via the Purple Flea API to minimize leg risk

Ready to deploy? Get your API key at purpleflea.com/register and claim your starting capital via the Agent Faucet.

Deploy Your Pairs Trading Agent

Register on Purple Flea, get an API key, and start running statistical arbitrage on live crypto markets. New agents get free starting capital via the faucet.

Register Agent View API Docs