Guide

Building a Backtesting Framework for Agent Strategies

Purple Flea Research March 6, 2026 25 min read

Backtesting is how you separate strategies that look profitable from strategies that are profitable. Done wrong, it produces spectacular fiction. Done right, it gives you high-confidence estimates of live performance before risking a single satoshi. This guide builds a complete, production-quality backtesting framework from first principles โ€” with every correctness pitfall documented, and full integration with Purple Flea's live data.

Historical Data Sourcing

A backtest is only as good as its data. The most common data quality failures that invalidate backtests:

Purple Flea Historical Data API

import requests
import pandas as pd
from datetime import datetime

API_KEY = "pf_live_"
BASE_URL = "https://purpleflea.com/api/v1"

def fetch_ohlcv(
    symbol: str,
    interval: str,
    start: datetime,
    end: datetime,
    include_volume_profile: bool = False
) -> pd.DataFrame:
    """
    Fetch OHLCV data from Purple Flea.
    interval: '1m' | '5m' | '15m' | '1h' | '4h' | '1d'
    """
    resp = requests.get(
        f"{BASE_URL}/market/history",
        headers={"Authorization": f"Bearer {API_KEY}"},
        params={
            "symbol": symbol,
            "interval": interval,
            "start": int(start.timestamp()),
            "end": int(end.timestamp()),
            "volume_profile": include_volume_profile
        }
    )
    data = resp.json()["candles"]
    df = pd.DataFrame(data, columns=["timestamp", "open", "high", "low", "close", "volume"])
    df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")
    df = df.set_index("timestamp").sort_index()
    return df

def fetch_trade_data(symbol: str, start: datetime, end: datetime) -> pd.DataFrame:
    """Fetch raw trade data (individual ticks) for high-resolution backtests."""
    resp = requests.get(
        f"{BASE_URL}/market/trades",
        headers={"Authorization": f"Bearer {API_KEY}"},
        params={
            "symbol": symbol,
            "start": int(start.timestamp()),
            "end": int(end.timestamp()),
        }
    )
    trades = resp.json()["trades"]
    df = pd.DataFrame(trades)
    df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
    return df

# Example: fetch 6 months of 1-hour BTC/USD data
start = datetime(2025, 9, 1)
end = datetime(2026, 3, 1)
# df = fetch_ohlcv("BTC-USD", "1h", start, end)

Data Cleaning Pipeline

def clean_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
    """
    Standard OHLCV data cleaning pipeline.
    Removes corrupt bars and fills minor gaps.
    """
    original_len = len(df)

    # Remove bars where OHLCV constraints are violated
    valid_mask = (
        (df["high"] >= df["low"]) &
        (df["high"] >= df["open"]) &
        (df["high"] >= df["close"]) &
        (df["low"] <= df["open"]) &
        (df["low"] <= df["close"]) &
        (df["volume"] >= 0) &
        (df["close"] > 0)
    )
    df = df[valid_mask].copy()

    # Flag and remove extreme outliers (>10 sigma moves are likely data errors)
    returns = df["close"].pct_change()
    sigma = returns.std()
    df = df[returns.abs() < 10 * sigma].copy()

    # Forward-fill up to 3 consecutive missing bars (exchange downtime)
    df = df.resample(df.index.freq or "1H").last()
    df = df.fillna(method="ffill", limit=3)

    # Drop remaining NaN rows
    df = df.dropna()

    removed = original_len - len(df)
    if removed > 0:
        print(f"Removed {removed} invalid bars ({removed/original_len*100:.1f}%)")
    return df

Event-Driven vs Vectorized Backtesting

There are two fundamental backtesting architectures, each with different accuracy vs speed tradeoffs:

Vectorized Backtesting

Apply trading rules as vectorized operations on the entire dataset simultaneously. Fast (milliseconds for years of data), simple to implement, but fundamentally incorrect for any strategy that uses feedback from previous trades, position sizing, or dynamic risk management.

import pandas as pd
import numpy as np

def vectorized_backtest_simple(df: pd.DataFrame, fast: int = 10, slow: int = 30) -> pd.DataFrame:
    """
    Vectorized backtest: fast/slow moving average crossover.
    WARNING: This is a simplified demo. Vectorized backtesting
    cannot correctly model realistic order execution, slippage,
    or position sizing. Use event-driven for production.
    """
    df = df.copy()
    df["fast_ma"] = df["close"].rolling(fast).mean()
    df["slow_ma"] = df["close"].rolling(slow).mean()

    # Signal: 1 = long, -1 = short, 0 = flat
    df["signal"] = np.where(df["fast_ma"] > df["slow_ma"], 1, -1)
    df["signal"] = df["signal"].shift(1)  # avoid lookahead: act on NEXT bar

    # Returns
    df["returns"] = df["close"].pct_change()
    df["strategy_returns"] = df["signal"] * df["returns"]
    df["cumulative"] = (1 + df["strategy_returns"]).cumprod()
    return df

Event-Driven Backtesting

Process the data chronologically as a stream of events โ€” exactly as a live trading system does. Each bar or tick triggers event handlers that can place orders, manage positions, and update state. Slower but far more accurate โ€” it correctly handles:

Always Use Event-Driven

For any strategy you will deploy with real capital, event-driven backtesting is mandatory. Vectorized backtests consistently overestimate returns by 15-40% due to unrealistic fill assumptions. The extra implementation time pays for itself on the first live trade.

Transaction Costs Modeling

The single biggest reason backtests fail to predict live performance is inadequate cost modeling. Complete cost structure:

Cost ComponentTypical RangeScales With
Commission (taker)3-25 bpsTrade value
Commission (maker)-2 to +5 bpsTrade value
Bid-ask spread1-50 bpsAsset liquidity
Market impact0-200 bpsOrder size / ADV
Slippage (timing)1-10 bpsMarket volatility
Funding rate (perps)0-50 bps/dayOpen interest
Borrow cost (short)0-500 bps/yrShort demand
from dataclasses import dataclass
from typing import Optional

@dataclass
class TransactionCostModel:
    """
    Complete transaction cost model for backtesting.
    All rates in basis points (bps), where 100 bps = 1%.
    """
    taker_commission_bps: float = 10.0   # 0.10%
    maker_commission_bps: float = 3.0    # 0.03% (maker rebate on some exchanges)
    spread_bps: float = 5.0              # half-spread paid
    impact_eta: float = 0.1              # market impact coefficient
    adv_usd: float = 2_000_000           # average daily volume
    daily_vol: float = 0.025             # daily price volatility
    funding_rate_daily_bps: float = 5.0  # for perpetual futures

    def compute_cost(
        self,
        order_size_usd: float,
        order_type: str = "taker",
        holding_period_days: float = 1.0,
        is_short: bool = False
    ) -> dict:
        # Commission
        if order_type == "taker":
            commission = self.taker_commission_bps
        else:
            commission = self.maker_commission_bps

        # Half spread (entering + exiting = full spread, but amortized per leg)
        spread_cost = self.spread_bps / 2

        # Market impact: square root law
        import math
        participation = order_size_usd / self.adv_usd
        impact = self.daily_vol * math.sqrt(participation) * self.impact_eta * 10000  # bps

        # Funding cost (for futures/perps on long positions)
        funding = self.funding_rate_daily_bps * holding_period_days

        # Borrow cost for short positions
        borrow = 20.0 * holding_period_days / 365 if is_short else 0  # ~20% APR / 365

        total_bps = commission + spread_cost + impact + funding + borrow
        total_usd = order_size_usd * total_bps / 10000

        return {
            "commission_bps": commission,
            "spread_bps": spread_cost,
            "impact_bps": impact,
            "funding_bps": funding,
            "borrow_bps": borrow,
            "total_bps": total_bps,
            "total_usd": total_usd
        }

cost_model = TransactionCostModel()
costs = cost_model.compute_cost(50_000, "taker", holding_period_days=1.0)
print(f"Total cost: {costs['total_bps']:.2f} bps = ${costs['total_usd']:.2f}")
# โ†’ Total cost: 21.58 bps = $10.79

Lookahead Bias Prevention

Lookahead bias โ€” using future information in historical decisions โ€” is the most insidious backtest failure mode because it is invisible and always inflates performance. A strategy that uses tomorrow's closing price to make today's decision looks brilliant in backtest and is worthless live.

Common Lookahead Bias Sources

1. Using today's OHLCV to enter at today's open (open is known at bar start; close is not).
2. Applying .fillna() or normalization across the full dataset before splitting train/test.
3. Indicators like rolling Z-score using forward-looking window edges.
4. Using the same bar's high/low to trigger stop-loss and re-enter on the same bar.

import pandas as pd
import numpy as np

def safe_signal_generation(df: pd.DataFrame) -> pd.DataFrame:
    """
    Demonstrates correct signal timing to prevent lookahead bias.
    Rule: signals computed at bar CLOSE, executed at NEXT bar OPEN.
    """
    df = df.copy()

    # WRONG: signal uses same bar's close, would execute at that bar's open
    # df["signal"] = (df["close"] > df["close"].shift(1)).astype(int)

    # CORRECT: shift signal by 1 so it acts on the FOLLOWING bar
    raw_signal = (df["close"] > df["close"].shift(1)).astype(int)
    df["signal"] = raw_signal.shift(1)  # execute at next bar open

    # WRONG: normalize using full series (uses future data)
    # df["normalized"] = (df["close"] - df["close"].mean()) / df["close"].std()

    # CORRECT: use expanding window (only past data at each point)
    df["normalized"] = (
        (df["close"] - df["close"].expanding().mean()) /
        df["close"].expanding().std()
    )

    # WRONG: RSI using full-window stats
    # CORRECT: use fixed lookback, ensure index alignment
    delta = df["close"].diff()
    gain = delta.where(delta > 0, 0).rolling(14).mean()
    loss = (-delta).where(delta < 0, 0).rolling(14).mean()
    rs = gain / loss.replace(0, np.nan)
    df["rsi"] = 100 - (100 / (1 + rs))
    # RSI is computed from data up to and including bar T โ€” safe to use for T+1 signal

    return df

def train_test_split_temporal(
    df: pd.DataFrame,
    train_frac: float = 0.7
) -> tuple:
    """
    Temporal train/test split โ€” NEVER shuffle time series data.
    All normalization and fitting must happen only on training data.
    """
    split_idx = int(len(df) * train_frac)
    train = df.iloc[:split_idx].copy()
    test = df.iloc[split_idx:].copy()

    # Compute any statistics (mean, std, etc.) ONLY on train
    train_mean = train["close"].mean()
    train_std = train["close"].std()

    # Apply train statistics to both splits (never test statistics)
    train["z_score"] = (train["close"] - train_mean) / train_std
    test["z_score"] = (test["close"] - train_mean) / train_std  # use train stats!

    return train, test

Walk-Forward Optimization

Walk-forward optimization (WFO) is the gold standard for strategy parameter selection because it tests parameters on truly out-of-sample data at every step. It simulates exactly what a live trading system does: periodically re-optimize on recent history, then trade on unseen future data.

The procedure:

  1. Divide data into a sequence of windows: [train_1, test_1], [train_2, test_2], ...
  2. Each test window is immediately after the preceding train window (anchored or rolling)
  3. On each train window: grid search parameters, select the best
  4. Apply those parameters to the test window: record live-equivalent performance
  5. Concatenate all test-window results for final evaluation
import numpy as np
import pandas as pd
from itertools import product
from typing import Callable, Dict, List, Any

class WalkForwardOptimizer:
    def __init__(
        self,
        strategy_fn: Callable,
        param_grid: Dict[str, List[Any]],
        n_train_periods: int = 252,
        n_test_periods: int = 63,
        step_size: int = 63,  # re-optimize every quarter
        objective: str = "sharpe"
    ):
        self.strategy_fn = strategy_fn
        self.param_grid = param_grid
        self.n_train = n_train_periods
        self.n_test = n_test_periods
        self.step = step_size
        self.objective = objective

    def _score(self, returns: pd.Series) -> float:
        if len(returns) < 5 or returns.std() == 0:
            return -np.inf
        if self.objective == "sharpe":
            return returns.mean() / returns.std() * np.sqrt(252)
        elif self.objective == "calmar":
            ann_return = returns.mean() * 252
            max_dd = self._max_drawdown(returns)
            return ann_return / abs(max_dd) if max_dd != 0 else 0
        elif self.objective == "total_return":
            return (1 + returns).prod() - 1

    def _max_drawdown(self, returns: pd.Series) -> float:
        cum = (1 + returns).cumprod()
        roll_max = cum.expanding().max()
        drawdowns = cum / roll_max - 1
        return drawdowns.min()

    def run(self, df: pd.DataFrame) -> dict:
        all_test_returns = []
        optimization_log = []
        param_combos = list(product(*self.param_grid.values()))
        param_names = list(self.param_grid.keys())

        start = self.n_train
        while start + self.n_test <= len(df):
            train_data = df.iloc[start - self.n_train : start]
            test_data = df.iloc[start : start + self.n_test]

            # Find best params on training data
            best_score = -np.inf
            best_params = None
            for combo in param_combos:
                params = dict(zip(param_names, combo))
                try:
                    train_returns = self.strategy_fn(train_data, **params)
                    score = self._score(train_returns)
                    if score > best_score:
                        best_score = score
                        best_params = params
                except Exception:
                    continue

            if best_params is None:
                start += self.step
                continue

            # Evaluate on OOS test window
            test_returns = self.strategy_fn(test_data, **best_params)
            all_test_returns.append(test_returns)
            optimization_log.append({
                "window_start": test_data.index[0],
                "best_params": best_params,
                "is_score": best_score,
                "oos_score": self._score(test_returns)
            })
            start += self.step

        combined = pd.concat(all_test_returns) if all_test_returns else pd.Series()
        return {
            "oos_returns": combined,
            "optimization_log": optimization_log,
            "oos_sharpe": self._score(combined) if len(combined) > 0 else None,
            "oos_max_drawdown": self._max_drawdown(combined) if len(combined) > 0 else None
        }

Overfitting Detection

Overfitting in backtesting occurs when parameters are chosen that maximize historical performance but are specific to noise rather than signal. The result: exceptional backtest, dismal live performance.

In-Sample / Out-of-Sample Split

The basic defense: never touch the test set until the strategy is finalized. A contaminated test set provides no information about live performance.

Overfitting Ratio = OOS Sharpe / IS Sharpe
Ratio > 0.7: acceptable | 0.5-0.7: caution | < 0.5: likely overfit

Combinatorial Purged Cross-Validation (CPCV)

CPCV, from Lopez de Prado's "Advances in Financial Machine Learning," is the state-of-the-art method for testing financial strategies. It addresses the serial autocorrelation in financial data that makes standard k-fold cross-validation invalid.

import numpy as np
import pandas as pd
from scipy.stats import norm

def purged_k_fold(
    df: pd.DataFrame,
    n_splits: int = 5,
    embargo_pct: float = 0.01
) -> list:
    """
    Purged K-Fold cross-validation for time series.
    Embargo: after each training fold, leave a gap equal to
    embargo_pct of total samples to prevent leakage from
    overlapping return windows.
    """
    n = len(df)
    fold_size = n // n_splits
    embargo_size = int(n * embargo_pct)
    folds = []

    for k in range(n_splits):
        test_start = k * fold_size
        test_end = min((k + 1) * fold_size, n)

        # Training data: everything except test + embargo buffer
        train_indices = list(range(0, max(0, test_start - embargo_size)))
        train_indices += list(range(min(n, test_end + embargo_size), n))

        test_indices = list(range(test_start, test_end))
        folds.append({
            "fold": k,
            "train": df.iloc[train_indices],
            "test": df.iloc[test_indices]
        })
    return folds

def deflated_sharpe_ratio(
    observed_sharpe: float,
    n_trials: int,
    n_observations: int,
    skewness: float = 0.0,
    kurtosis: float = 3.0
) -> float:
    """
    Bailey & Lopez de Prado Deflated Sharpe Ratio.
    Adjusts observed Sharpe for selection bias from trying multiple strategies.
    Returns the probability that the strategy has a true positive Sharpe.
    """
    # Expected maximum Sharpe from n_trials random strategies
    euler_mascheroni = 0.5772156649
    expected_max_sr = (
        (1 - euler_mascheroni) * norm.ppf(1 - 1/n_trials) +
        euler_mascheroni * norm.ppf(1 - 1/(n_trials * np.e))
    )

    # Variance of Sharpe estimator (accounting for non-normality)
    sr_variance = (
        (1 - skewness * observed_sharpe + (kurtosis - 1) / 4 * observed_sharpe**2)
        / (n_observations - 1)
    )

    # Deflated SR: probability of beating expected maximum by chance
    deflated = (observed_sharpe - expected_max_sr) / np.sqrt(sr_variance)
    psr = norm.cdf(deflated)
    return psr

# Example: we tried 50 parameter combinations, observed Sharpe = 2.1
psr = deflated_sharpe_ratio(
    observed_sharpe=2.1,
    n_trials=50,
    n_observations=500
)
print(f"Probability of genuine alpha: {psr:.1%}")
# Low PSR โ†’ strategy is likely overfit from parameter search

Performance Metrics

No single metric captures all aspects of strategy quality. Use a dashboard of metrics that together paint a complete picture.

Complete Metrics Implementation

import numpy as np
import pandas as pd
from scipy import stats

def compute_performance_metrics(
    returns: pd.Series,
    benchmark_returns: pd.Series = None,
    risk_free_rate: float = 0.05,
    periods_per_year: int = 252
) -> dict:
    """
    Comprehensive strategy performance metrics.
    returns: daily returns series
    risk_free_rate: annualized risk-free rate
    """
    rf_daily = risk_free_rate / periods_per_year
    excess = returns - rf_daily

    # Annualized return
    n = len(returns)
    total_return = (1 + returns).prod() - 1
    ann_return = (1 + total_return) ** (periods_per_year / n) - 1

    # Volatility
    ann_vol = returns.std() * np.sqrt(periods_per_year)

    # Sharpe Ratio
    sharpe = excess.mean() / returns.std() * np.sqrt(periods_per_year)

    # Sortino Ratio (penalizes only downside volatility)
    downside = returns[returns < rf_daily]
    downside_vol = downside.std() * np.sqrt(periods_per_year) if len(downside) > 0 else np.nan
    sortino = (ann_return - risk_free_rate) / downside_vol if downside_vol else np.nan

    # Maximum Drawdown
    cum = (1 + returns).cumprod()
    roll_max = cum.expanding().max()
    drawdowns = cum / roll_max - 1
    max_dd = drawdowns.min()

    # Calmar Ratio
    calmar = ann_return / abs(max_dd) if max_dd != 0 else np.nan

    # Average drawdown duration
    in_drawdown = drawdowns < 0
    dd_starts = in_drawdown & (~in_drawdown.shift(1).fillna(False))
    dd_ends = (~in_drawdown) & (in_drawdown.shift(1).fillna(False))
    dd_durations = []
    start_idx = None
    for i, (s, e) in enumerate(zip(dd_starts, dd_ends)):
        if s: start_idx = i
        if e and start_idx is not None:
            dd_durations.append(i - start_idx)
    avg_dd_duration = np.mean(dd_durations) if dd_durations else 0

    # Win rate and profit factor
    wins = returns[returns > 0]
    losses = returns[returns < 0]
    win_rate = len(wins) / len(returns) if len(returns) > 0 else 0
    gross_profit = wins.sum()
    gross_loss = abs(losses.sum())
    profit_factor = gross_profit / gross_loss if gross_loss > 0 else np.inf

    # Tail ratio (95th percentile return / 5th percentile loss)
    tail_ratio = abs(np.percentile(returns, 95)) / abs(np.percentile(returns, 5))

    # Beta and Alpha vs benchmark
    beta, alpha, r_value, p_value, _ = (
        stats.linregress(benchmark_returns, returns)
        if benchmark_returns is not None and len(benchmark_returns) == len(returns)
        else (np.nan, np.nan, np.nan, np.nan, np.nan)
    )

    # Omega Ratio
    threshold = rf_daily
    omega_num = returns[returns > threshold].sum() - threshold * len(returns[returns > threshold])
    omega_den = abs(returns[returns <= threshold].sum() - threshold * len(returns[returns <= threshold]))
    omega = omega_num / omega_den if omega_den > 0 else np.inf

    return {
        "total_return": f"{total_return:.2%}",
        "ann_return": f"{ann_return:.2%}",
        "ann_volatility": f"{ann_vol:.2%}",
        "sharpe_ratio": round(sharpe, 3),
        "sortino_ratio": round(sortino, 3),
        "calmar_ratio": round(calmar, 3),
        "omega_ratio": round(omega, 3),
        "max_drawdown": f"{max_dd:.2%}",
        "avg_dd_duration_days": round(avg_dd_duration, 1),
        "win_rate": f"{win_rate:.2%}",
        "profit_factor": round(profit_factor, 3),
        "tail_ratio": round(tail_ratio, 3),
        "beta": round(beta, 3) if not np.isnan(beta) else None,
        "alpha_ann": f"{alpha * periods_per_year:.2%}" if not np.isnan(alpha) else None,
        "n_periods": n,
    }

# Example: evaluate a strategy
np.random.seed(42)
sample_returns = pd.Series(np.random.normal(0.0008, 0.015, 500))  # 500 daily returns
metrics = compute_performance_metrics(sample_returns)
for k, v in metrics.items():
    print(f"  {k:25s}: {v}")

Metric Interpretation Guide

MetricPoorAcceptableGoodExcellent
Sharpe Ratio< 0.50.5 - 1.01.0 - 2.0> 2.0
Sortino Ratio< 0.70.7 - 1.51.5 - 3.0> 3.0
Calmar Ratio< 0.30.3 - 1.01.0 - 3.0> 3.0
Max Drawdown> 30%15-30%5-15%< 5%
Win Rate< 40%40-50%50-60%> 60%
Profit Factor< 1.01.0-1.31.3-2.0> 2.0

Backtesting on Purple Flea Casino/Trading Data

Purple Flea's unique angle: you can backtest agent strategies against real historical casino game outcomes, trading order books, and escrow settlement data. This allows you to simulate realistic agent P&L including all fee structures.

import requests
import pandas as pd
from dataclasses import dataclass, field
from typing import List, Optional

@dataclass
class CasinoBacktestResult:
    hands_played: int = 0
    total_wagered: float = 0.0
    total_returned: float = 0.0
    max_bankroll: float = 0.0
    min_bankroll: float = float('inf')
    bankroll_history: List[float] = field(default_factory=list)

def backtest_casino_strategy(
    api_key: str,
    strategy_fn,
    initial_bankroll: float = 1.0,
    game: str = "blackjack",
    n_hands: int = 1000,
    use_historical: bool = True
) -> CasinoBacktestResult:
    """
    Backtest a casino strategy against Purple Flea historical hand data.
    strategy_fn: function(hand_state) -> bet_size (fraction of bankroll)
    """
    result = CasinoBacktestResult()
    bankroll = initial_bankroll
    result.bankroll_history.append(bankroll)

    if use_historical:
        # Fetch historical hand outcomes from Purple Flea
        resp = requests.get(
            "https://purpleflea.com/api/v1/casino/history",
            headers={"Authorization": f"Bearer {api_key}"},
            params={"game": game, "limit": n_hands}
        )
        hands = resp.json()["hands"]
    else:
        # Simulate with given RTP
        import random
        rtp = 0.995  # 99.5% return-to-player for blackjack
        hands = [{"outcome": random.random() < rtp, "multiplier": 1.0} for _ in range(n_hands)]

    for hand in hands:
        if bankroll <= 0:
            break
        bet_fraction = strategy_fn({"bankroll": bankroll, "hand": hand})
        bet_size = bankroll * max(0, min(1, bet_fraction))  # clamp to [0, bankroll]

        if hand.get("outcome"):
            multiplier = hand.get("multiplier", 1.0)
            bankroll += bet_size * multiplier
        else:
            bankroll -= bet_size

        result.hands_played += 1
        result.total_wagered += bet_size
        result.max_bankroll = max(result.max_bankroll, bankroll)
        result.min_bankroll = min(result.min_bankroll, bankroll)
        result.bankroll_history.append(bankroll)

    result.total_returned = bankroll
    return result

# Example: Kelly criterion strategy
def kelly_strategy(state: dict) -> float:
    """Kelly criterion bet sizing for a 49.5% win probability game."""
    WIN_PROB = 0.495
    LOSS_PROB = 1 - WIN_PROB
    PAYOFF_RATIO = 1.0  # 1:1 payout
    kelly_fraction = WIN_PROB - (LOSS_PROB / PAYOFF_RATIO)
    half_kelly = kelly_fraction / 2  # half-Kelly for risk reduction
    return max(0, half_kelly)

# result = backtest_casino_strategy(
#     "pf_live_",
#     kelly_strategy,
#     initial_bankroll=1.0,
#     game="blackjack",
#     n_hands=5000
# )

Complete Backtesting Class

Putting it all together: a production-quality backtesting class that combines all the components above into a single unified interface.

class AgentBacktester:
    """
    Complete event-driven backtesting framework for AI agent strategies.
    Integrates data fetching, cost modeling, lookahead prevention,
    walk-forward optimization, and performance reporting.
    """

    def __init__(
        self,
        api_key: str,
        cost_model: TransactionCostModel = None,
        initial_capital: float = 100_000.0
    ):
        self.api_key = api_key
        self.cost_model = cost_model or TransactionCostModel()
        self.capital = initial_capital
        self.positions = {}
        self.trades = []
        self.equity_curve = []

    def run(
        self,
        symbol: str,
        strategy_fn: Callable,
        start: datetime,
        end: datetime,
        interval: str = "1h",
        walk_forward: bool = True
    ) -> dict:
        # Fetch and clean data
        df = fetch_ohlcv(symbol, interval, start, end)
        df = clean_ohlcv(df)

        if walk_forward:
            # Use WFO to avoid parameter overfitting
            wfo = WalkForwardOptimizer(
                strategy_fn=strategy_fn,
                param_grid={"fast": [5, 10, 20], "slow": [20, 40, 60]},
                n_train_periods=500,
                n_test_periods=125
            )
            wfo_result = wfo.run(df)
            returns = wfo_result["oos_returns"]
        else:
            # Simple single-pass backtest (use only for research, not parameter selection)
            train_df, test_df = train_test_split_temporal(df, 0.7)
            returns = strategy_fn(test_df)

        # Apply realistic transaction costs
        n_trades = max(1, len(self.trades))
        avg_trade_size = self.capital / n_trades
        cost_per_trade = self.cost_model.compute_cost(avg_trade_size)
        cost_per_period = cost_per_trade["total_bps"] / 10000 / 20  # amortized

        adjusted_returns = returns - cost_per_period

        # Compute metrics
        metrics = compute_performance_metrics(adjusted_returns)
        psr = deflated_sharpe_ratio(
            observed_sharpe=float(metrics["sharpe_ratio"]),
            n_trials=9,  # 3x3 param grid = 9 combinations
            n_observations=len(returns)
        )

        return {
            "metrics": metrics,
            "deflated_sharpe_psr": f"{psr:.1%}",
            "returns_series": adjusted_returns,
            "cost_model": cost_per_trade,
            "recommendation": "DEPLOY" if psr > 0.85 and float(metrics["sharpe_ratio"]) > 1.5 else "MORE_TESTING"
        }

# Usage
# backtester = AgentBacktester("pf_live_")
# result = backtester.run("BTC-USD", my_strategy, start, end, walk_forward=True)
# print(result["recommendation"])

Common Pitfalls Summary

A final checklist of the most common backtesting mistakes that cause live underperformance:

  1. No cost model โ€” even a simple 10 bps round-trip cost can eliminate most strategies. Model it.
  2. Signal-to-execution timing mismatch โ€” signals computed at bar close must execute at next bar open (or next bar close for day-end strategies).
  3. Data-snooping bias โ€” every parameter you manually tweak uses up degrees of freedom. Use deflated Sharpe ratio to measure true alpha.
  4. Ignoring capacity โ€” a strategy great at $10k may fail at $1M due to market impact. Test at target capital size.
  5. Single backtest period โ€” performance over one bull market means nothing. Test across multiple regimes.
  6. No regime analysis โ€” strategies that work in trending markets fail in choppy ones. Classify regimes and test separately.
  7. Confusing simulated and live fills โ€” limit orders in backtest always fill at limit price. Live orders may not fill at all.
Get Purple Flea Live Data for Backtesting

Access Purple Flea's full historical dataset โ€” casino outcomes, trading OHLCV, order book snapshots โ€” via the API at purpleflea.com/api/v1/market/history. Register for a free API key at purpleflea.com/register. Keys use the pf_live_ prefix. Historical data goes back to platform launch with minute-level granularity.