Guide

Backtesting Trading Strategies for AI Agents

How autonomous agents backtest and validate trading strategies before live deployment — vectorized backtesting, walk-forward optimization, Monte Carlo simulation, and overfitting prevention.

8+
Validation Methods
WFO
Walk-Forward Opt.
1000x
Monte Carlo Runs

Purple Flea · March 6, 2026 · 20 min read

1. Backtesting Methodology

Backtesting applies a trading strategy to historical data to evaluate how it would have performed. For autonomous AI agents, backtesting serves as the primary quality gate before live deployment — a strategy that cannot demonstrate profitability on historical data should never touch real capital.

The fundamental challenge is that backtesting is inherently retrospective. The agent knows what happened, which introduces look-ahead bias if not carefully controlled. A rigorous backtest must simulate the exact information state available at each decision point, including data latency, confirmation delays, and execution timing.

Core principle: A backtest is only as good as the assumptions it encodes about execution. Unrealistic execution assumptions (zero slippage, instant fills, no partial fills) inflate performance metrics and create false confidence in strategies that will underperform live.

What a Backtest Must Simulate

  • Market data availability: Only data available at decision time, not future bars
  • Order types and fills: Market orders vs limit orders, partial fills, order expiry
  • Transaction costs: Commissions, spread, funding rates, withdrawal fees
  • Slippage: Price impact of executing at market, especially for larger orders
  • Capital constraints: Margin requirements, leverage limits, minimum order sizes
  • Portfolio constraints: Max positions, concentration limits (see risk limits guide)

Backtest vs Forward Test vs Live

PhaseDataRiskPurpose
In-sample backtestHistorical (training)ZeroStrategy development
Out-of-sample backtestHistorical (holdout)ZeroInitial validation
Walk-forward testRolling historical windowsZeroRobustness validation
Paper tradeLive (simulated execution)ZeroPre-live validation
Live smallLive (real execution)MinimalReal-world calibration
Live fullLive (real execution)FullStrategy deployment

Agents should pass all phases before reaching live full. The Purple Flea faucet provides capital for the "live small" phase, bridging the gap between paper trading and full live deployment.

2. Vectorized vs Event-Driven Backtesting

Two architectures dominate backtesting implementations, each with different tradeoffs.

Vectorized Backtesting

Vectorized backtesting applies the entire strategy to the full dataset at once using array operations (NumPy/Pandas). It is extremely fast — a full year of minute-bar data can be processed in seconds. The tradeoff is that it cannot simulate complex execution logic, order book dynamics, or multi-asset interdependencies.

Best for: Simple strategies with fixed entry/exit rules, EMA crossovers, momentum signals, or binary outcome strategies where execution timing within a bar does not matter significantly.

import pandas as pd
import numpy as np
from dataclasses import dataclass

@dataclass
class BacktestResult:
    total_return: float
    annualized_return: float
    sharpe_ratio: float
    max_drawdown: float
    win_rate: float
    profit_factor: float
    total_trades: int
    equity_curve: pd.Series

class VectorizedBacktester:
    """
    Fast vectorized backtester for single-asset strategies.
    Processes entire dataset in a single pass using NumPy/Pandas.
    """

    def __init__(
        self,
        commission_rate: float = 0.001,  # 0.1% per trade
        slippage_rate: float = 0.0005,   # 0.05% slippage
        initial_capital: float = 10_000.0
    ):
        self.commission = commission_rate
        self.slippage = slippage_rate
        self.initial_capital = initial_capital

    def run(self, prices: pd.Series, signals: pd.Series) -> BacktestResult:
        """
        Run vectorized backtest.

        Args:
            prices: OHLCV close prices indexed by datetime
            signals: Position signals (-1, 0, 1) for each bar

        Returns:
            BacktestResult with performance metrics
        """
        # Position changes determine when trades occur
        position_changes = signals.diff().fillna(0)
        trades = position_changes[position_changes != 0]

        # Returns: price return * signal (shifted to avoid look-ahead)
        price_returns = prices.pct_change().shift(-1)  # Next bar returns
        strategy_returns = signals * price_returns

        # Apply transaction costs on position changes
        cost_per_change = (self.commission + self.slippage) * position_changes.abs()
        net_returns = strategy_returns - cost_per_change

        # Equity curve
        equity = (1 + net_returns).cumprod() * self.initial_capital
        equity = equity.fillna(method='ffill').fillna(self.initial_capital)

        # Metrics
        total_return = (equity.iloc[-1] / equity.iloc[0]) - 1
        n_years = len(equity) / 252
        annualized = (1 + total_return) ** (1 / max(n_years, 0.01)) - 1

        daily_returns = equity.pct_change().dropna()
        sharpe = (daily_returns.mean() / daily_returns.std()) * np.sqrt(252) if daily_returns.std() > 0 else 0

        rolling_max = equity.cummax()
        drawdown = (equity - rolling_max) / rolling_max
        max_dd = drawdown.min()

        # Trade-level stats
        trade_returns = []
        position = 0
        entry_price = 0.0
        for i, (ts, px) in enumerate(prices.items()):
            sig = signals.get(ts, 0)
            if sig != position:
                if position != 0 and entry_price > 0:
                    ret = (px - entry_price) / entry_price * position
                    trade_returns.append(ret)
                position = sig
                entry_price = px

        wins = [r for r in trade_returns if r > 0]
        losses = [r for r in trade_returns if r <= 0]
        win_rate = len(wins) / len(trade_returns) if trade_returns else 0
        gross_profit = sum(wins)
        gross_loss = abs(sum(losses))
        profit_factor = gross_profit / gross_loss if gross_loss > 0 else float('inf')

        return BacktestResult(
            total_return=total_return,
            annualized_return=annualized,
            sharpe_ratio=sharpe,
            max_drawdown=max_dd,
            win_rate=win_rate,
            profit_factor=profit_factor,
            total_trades=len(trade_returns),
            equity_curve=equity
        )

Event-Driven Backtesting

Event-driven backtesting processes data bar-by-bar, triggering event handlers as each new bar arrives. It is slower than vectorized but accurately simulates order queuing, partial fills, multi-asset rebalancing, and latency. For production agents, event-driven backtests provide more realistic performance estimates.

from enum import Enum
from dataclasses import dataclass, field
from typing import Callable
import heapq

class EventType(Enum):
    MARKET_DATA = 'market_data'
    SIGNAL = 'signal'
    ORDER = 'order'
    FILL = 'fill'

@dataclass
class Event:
    timestamp: pd.Timestamp
    event_type: EventType
    data: dict

    def __lt__(self, other):
        return self.timestamp < other.timestamp

class EventDrivenBacktester:
    """
    Event-driven backtester with realistic order simulation.
    Processes events in chronological order for accurate sequencing.
    """

    def __init__(self, initial_capital: float = 10_000):
        self.capital = initial_capital
        self.positions: dict[str, float] = {}
        self.events: list = []
        self.handlers: dict[EventType, list[Callable]] = {e: [] for e in EventType}
        self.equity_history: list[tuple] = []

    def subscribe(self, event_type: EventType, handler: Callable) -> None:
        """Register an event handler."""
        self.handlers[event_type].append(handler)

    def emit(self, event: Event) -> None:
        """Add event to the priority queue."""
        heapq.heappush(self.events, event)

    def run(self, market_data: pd.DataFrame) -> dict:
        """
        Process all market data events in sequence.
        market_data: DataFrame with columns [open, high, low, close, volume]
        """
        # Load all market data into event queue
        for ts, row in market_data.iterrows():
            self.emit(Event(
                timestamp=ts,
                event_type=EventType.MARKET_DATA,
                data=row.to_dict()
            ))

        # Process event loop
        while self.events:
            event = heapq.heappop(self.events)
            for handler in self.handlers[event.event_type]:
                new_events = handler(event, self)
                if new_events:
                    for ne in new_events:
                        self.emit(ne)

            # Record equity after each market data event
            if event.event_type == EventType.MARKET_DATA:
                position_value = sum(
                    qty * event.data.get('close', 0)
                    for sym, qty in self.positions.items()
                )
                self.equity_history.append((event.timestamp, self.capital + position_value))

        return {'equity': pd.Series(
            dict(self.equity_history), name='equity'
        )}

3. Data Quality and Survivorship Bias

Garbage data produces garbage backtests. Before running any strategy validation, the agent must audit data quality across several dimensions.

Common Data Quality Issues

  • Survivorship bias: Datasets containing only assets that survived (e.g., index constituents today) overstate historical returns because failed assets are excluded
  • Look-ahead bias: Using data in decisions that would not have been available at that time (e.g., end-of-day close for intraday decisions)
  • Adjusted vs unadjusted prices: Dividend and split adjustments retroactively change historical prices — correct for these consistently
  • Stale prices: Illiquid assets may have gaps or stale quotes that create phantom opportunities
  • Timezone inconsistencies: Mixing UTC and local timestamps creates incorrect sequencing

Survivorship bias warning: A simple mean-reversion strategy on S&P 500 constituents that uses today's constituent list to backtest 10 years will include companies that only joined the index recently — missing the ones that failed. This inflates backtest returns by 1-3% annually.

class DataQualityChecker:
    """Automated data quality validation before backtesting."""

    def __init__(self, max_gap_bars: int = 5, max_price_jump_pct: float = 0.20):
        self.max_gap = max_gap_bars
        self.max_jump = max_price_jump_pct

    def check(self, df: pd.DataFrame) -> dict:
        """
        Run full data quality audit.

        Returns:
            Dict with 'passed' bool and list of 'issues'
        """
        issues = []

        # 1. Missing values
        null_counts = df.isnull().sum()
        if null_counts.any():
            issues.append(f"Missing values: {null_counts[null_counts > 0].to_dict()}")

        # 2. Price continuity (detect gaps)
        if 'close' in df.columns:
            pct_changes = df['close'].pct_change().abs()
            spikes = pct_changes[pct_changes > self.max_jump]
            if not spikes.empty:
                issues.append(f"Price spikes > {self.max_jump:.0%} at: {spikes.index.tolist()[:5]}")

        # 3. Temporal gaps (missing bars)
        if isinstance(df.index, pd.DatetimeIndex) and len(df) > 1:
            expected_freq = df.index.to_series().diff().mode()[0]
            actual_diffs = df.index.to_series().diff()
            large_gaps = actual_diffs[actual_diffs > expected_freq * self.max_gap]
            if not large_gaps.empty:
                issues.append(f"Temporal gaps > {self.max_gap} bars at: {large_gaps.index.tolist()[:5]}")

        # 4. Zero/negative prices
        if 'close' in df.columns:
            invalid = df['close'][df['close'] <= 0]
            if not invalid.empty:
                issues.append(f"Zero/negative prices at: {invalid.index.tolist()[:5]}")

        # 5. Volume consistency
        if 'volume' in df.columns:
            zero_vol = df['volume'][df['volume'] == 0]
            if len(zero_vol) > len(df) * 0.05:  # > 5% zero volume days
                issues.append(f"Excessive zero-volume bars: {len(zero_vol)} ({len(zero_vol)/len(df):.1%})")

        return {
            'passed': len(issues) == 0,
            'issues': issues,
            'rows': len(df),
            'date_range': f"{df.index[0]} to {df.index[-1]}" if len(df) > 0 else 'empty'
        }

4. Transaction Cost Modeling

Transaction costs are the single biggest cause of the gap between backtest performance and live performance. A strategy showing 30% annual return in backtest with zero-cost assumptions may show 10% or even negative returns live when costs are properly accounted for.

Purple Flea Fee Structure

ProductFee TypeRateNotes
Perpetuals (maker)Per trade0.02%Limit orders that add liquidity
Perpetuals (taker)Per trade0.05%Market orders that remove liquidity
CasinoHouse edge1-3%Per game, varies by game type
EscrowSettlement fee1%On escrow completion
Funding ratePer 8hVariableLong pays short when positive

For high-frequency strategies that trade many times per day, even 0.05% per trade compounds to enormous costs. A strategy making 10 round trips per day incurs 1% per day in taker fees alone — roughly 250% annually just in costs. Only strategies with very high edge can survive this cost burden.

@dataclass
class TransactionCostModel:
    """Model all transaction costs for accurate backtesting."""

    maker_fee: float = 0.0002      # 0.02%
    taker_fee: float = 0.0005      # 0.05%
    slippage_rate: float = 0.0003  # 0.03% base slippage
    funding_rate_8h: float = 0.0001  # Variable, use historical average

    def cost_of_trade(
        self,
        notional: float,
        is_maker: bool = False,
        hold_bars_8h: int = 1,
        is_long: bool = True
    ) -> dict:
        """
        Compute total cost of entering and exiting a position.

        Args:
            notional: Position size in USD
            is_maker: True if using limit orders (maker fee)
            hold_bars_8h: Number of 8h funding periods held
            is_long: True for long (pays positive funding)

        Returns:
            Dict with itemized costs
        """
        entry_fee_rate = self.maker_fee if is_maker else self.taker_fee
        exit_fee_rate = self.maker_fee if is_maker else self.taker_fee

        entry_fee = notional * (entry_fee_rate + self.slippage_rate)
        exit_fee = notional * (exit_fee_rate + self.slippage_rate)

        # Funding only paid if long (or short when rate negative)
        funding_sign = 1 if is_long else -1
        funding_cost = notional * self.funding_rate_8h * hold_bars_8h * funding_sign

        total = entry_fee + exit_fee + max(0, funding_cost)

        return {
            'entry_fee': entry_fee,
            'exit_fee': exit_fee,
            'funding': funding_cost,
            'total': total,
            'total_pct': total / notional,
            'breakeven_return': total / notional  # Minimum return to cover costs
        }

    def min_edge_required(
        self,
        daily_trades: int,
        avg_hold_hours: float = 8,
        is_maker: bool = False
    ) -> float:
        """
        Compute minimum daily strategy edge required to be profitable.

        Returns:
            Required daily return as decimal (e.g., 0.01 = 1%)
        """
        cost_per_trade = (self.maker_fee if is_maker else self.taker_fee) + self.slippage_rate
        round_trip_cost = cost_per_trade * 2  # Enter + exit
        funding_per_trade = self.funding_rate_8h * (avg_hold_hours / 8)

        total_daily_cost = daily_trades * (round_trip_cost + funding_per_trade)
        return total_daily_cost


# Evaluate a high-frequency strategy
cm = TransactionCostModel()
edge_needed = cm.min_edge_required(daily_trades=20, avg_hold_hours=1, is_maker=False)
print(f"Minimum daily return to break even: {edge_needed:.2%}")
# With 20 trades/day at taker rates: need ~1.6% daily just to cover costs

5. Slippage Estimation

Slippage is the difference between the expected fill price and the actual fill price. For small orders on liquid instruments, slippage is negligible. For larger orders or illiquid instruments, slippage can be the dominant cost.

Slippage Models

  • Fixed slippage: Add a constant bps regardless of order size (simplest, least accurate)
  • Percentage slippage: Proportional to order size relative to average volume
  • Square-root impact model: Industry standard for estimating market impact
slippage = σ × √(Q / ADV)

Where σ is daily volatility, Q is order quantity, and ADV is average daily volume. This square-root model is consistent with empirical market microstructure research and used by institutional traders for cost estimation.

class SlippageModel:
    """
    Realistic slippage estimation using square-root market impact model.
    Used in backtesting to simulate execution costs.
    """

    def __init__(
        self,
        daily_vol: float = 0.02,     # Asset daily volatility
        adv_usd: float = 1_000_000,  # Average daily volume in USD
        participation_limit: float = 0.10  # Max 10% of ADV per order
    ):
        self.daily_vol = daily_vol
        self.adv = adv_usd
        self.participation_limit = participation_limit

    def estimate(self, order_size_usd: float) -> dict:
        """
        Estimate slippage for a given order size.

        Returns:
            Dict with slippage as fraction and USD amount
        """
        # Participation rate (what fraction of daily volume this order represents)
        participation = order_size_usd / self.adv

        if participation > self.participation_limit:
            # Order too large — will have severe market impact
            return {
                'feasible': False,
                'reason': f"Order ({participation:.1%} of ADV) exceeds {self.participation_limit:.0%} participation limit",
                'max_order_usd': self.adv * self.participation_limit
            }

        # Square-root impact model
        slippage_pct = self.daily_vol * np.sqrt(participation)

        # Add bid-ask spread component (assume half-spread = 0.5 * daily_vol * 0.1)
        half_spread = self.daily_vol * 0.05

        total_slippage = slippage_pct + half_spread

        return {
            'feasible': True,
            'participation': participation,
            'market_impact_pct': slippage_pct,
            'half_spread_pct': half_spread,
            'total_slippage_pct': total_slippage,
            'total_slippage_usd': total_slippage * order_size_usd
        }

    def adjust_fill_price(self, quoted_price: float, order_size_usd: float, is_buy: bool) -> float:
        """Apply slippage to get simulated fill price."""
        slip = self.estimate(order_size_usd)
        if not slip.get('feasible'):
            return quoted_price  # Can't size this order
        direction = 1 if is_buy else -1
        return quoted_price * (1 + direction * slip['total_slippage_pct'])

6. Walk-Forward Optimization

Walk-forward optimization (WFO) is the gold standard for validating that a strategy's parameters are robust and not overfitted to a specific historical period. It works by repeatedly optimizing parameters on an in-sample window, then testing the optimal parameters on the immediately following out-of-sample window.

WFO Procedure

  1. Select in-sample window size (e.g., 6 months) and out-of-sample size (e.g., 2 months)
  2. Optimize parameters on the first in-sample window
  3. Apply optimal parameters to the next out-of-sample window, record performance
  4. Advance both windows by the out-of-sample period
  5. Repeat until dataset is exhausted
  6. Concatenate all out-of-sample windows to form the WFO equity curve

A strategy that performs well in walk-forward testing demonstrates parameter stability — its optimal parameters do not radically change from window to window, indicating the strategy has genuine edge rather than curve-fit noise.

from itertools import product
from typing import Any

class WalkForwardOptimizer:
    """
    Walk-forward optimization for strategy parameter validation.
    Prevents overfitting by testing on truly out-of-sample data.
    """

    def __init__(
        self,
        in_sample_bars: int = 120,   # ~6 months of daily data
        out_of_sample_bars: int = 40, # ~2 months
        min_trades: int = 20         # Minimum trades to count a window
    ):
        self.is_bars = in_sample_bars
        self.oos_bars = out_of_sample_bars
        self.min_trades = min_trades

    def run(
        self,
        prices: pd.Series,
        param_grid: dict[str, list],
        strategy_fn: callable,
        objective: str = 'sharpe_ratio'
    ) -> dict:
        """
        Run walk-forward optimization.

        Args:
            prices: Historical price series
            param_grid: Dict mapping param names to candidate values
            strategy_fn: Function(prices, **params) -> BacktestResult
            objective: Metric to maximize in in-sample period

        Returns:
            WFO results with combined OOS performance
        """
        total_bars = len(prices)
        window_start = 0
        oos_results = []
        optimal_params_history = []

        while window_start + self.is_bars + self.oos_bars <= total_bars:
            is_end = window_start + self.is_bars
            oos_end = is_end + self.oos_bars

            is_prices = prices.iloc[window_start:is_end]
            oos_prices = prices.iloc[is_end:oos_end]

            # Grid search on in-sample data
            best_score = -float('inf')
            best_params = {}

            param_names = list(param_grid.keys())
            param_values = list(param_grid.values())

            for combo in product(*param_values):
                params = dict(zip(param_names, combo))
                try:
                    result = strategy_fn(is_prices, **params)
                    score = getattr(result, objective, -float('inf'))
                    if score > best_score and result.total_trades >= self.min_trades:
                        best_score = score
                        best_params = params
                except Exception:
                    continue

            if best_params:
                # Apply best params to OOS window
                oos_result = strategy_fn(oos_prices, **best_params)
                oos_results.append({
                    'window_start': prices.index[window_start],
                    'window_is_end': prices.index[is_end - 1],
                    'window_oos_end': prices.index[oos_end - 1],
                    'optimal_params': best_params,
                    'is_score': best_score,
                    'oos_sharpe': oos_result.sharpe_ratio,
                    'oos_return': oos_result.total_return,
                    'oos_max_dd': oos_result.max_drawdown
                })
                optimal_params_history.append(best_params)

            window_start += self.oos_bars

        if not oos_results:
            return {'error': 'No complete windows found'}

        # Aggregate OOS statistics
        oos_sharpes = [r['oos_sharpe'] for r in oos_results]
        oos_returns = [r['oos_return'] for r in oos_results]
        positive_windows = sum(1 for r in oos_returns if r > 0)

        return {
            'windows': oos_results,
            'avg_oos_sharpe': np.mean(oos_sharpes),
            'median_oos_sharpe': np.median(oos_sharpes),
            'avg_oos_return': np.mean(oos_returns),
            'pct_positive_windows': positive_windows / len(oos_results),
            'param_stability': self._param_stability(optimal_params_history),
            'recommendation': 'PASS' if np.mean(oos_sharpes) > 0.5 else 'FAIL'
        }

    def _param_stability(self, history: list[dict]) -> dict:
        """Measure how stable optimal parameters are across windows."""
        if not history:
            return {}
        stability = {}
        for param in history[0].keys():
            values = [h[param] for h in history]
            stability[param] = {
                'mean': np.mean(values),
                'std': np.std(values),
                'cv': np.std(values) / np.mean(values) if np.mean(values) != 0 else float('inf')
            }
        return stability

7. Monte Carlo Simulation for Robustness

Monte Carlo simulation tests strategy robustness by randomly shuffling the sequence of historical trades and computing the distribution of outcomes. If the strategy's edge is real, the performance should be roughly preserved across random orderings. If the original equity curve's performance largely disappears when randomized, the original order of trades was unusually lucky.

What Monte Carlo Tests

  • Luck vs edge: How likely is the backtest performance given random trade ordering?
  • Drawdown distribution: What is the 95th percentile maximum drawdown?
  • Time to recovery: How long does it typically take to recover from drawdowns?
  • Ruin probability: What fraction of simulations reach a 50% drawdown?
class MonteCarloSimulator:
    """
    Monte Carlo simulation for strategy robustness testing.
    Bootstraps trade returns to generate outcome distribution.
    """

    def __init__(self, n_simulations: int = 1000, confidence_level: float = 0.95):
        self.n_sims = n_simulations
        self.confidence = confidence_level

    def simulate(
        self,
        trade_returns: list[float],
        initial_capital: float = 10_000,
        ruin_threshold: float = 0.50
    ) -> dict:
        """
        Run Monte Carlo simulation by bootstrapping trade returns.

        Args:
            trade_returns: List of per-trade P&L as decimal returns
            initial_capital: Starting capital for each simulation
            ruin_threshold: Drawdown level considered "ruin"

        Returns:
            Statistical summary of simulated outcomes
        """
        if len(trade_returns) < 10:
            return {'error': 'Insufficient trade history (need >= 10 trades)'}

        r = np.array(trade_returns)
        final_values = []
        max_drawdowns = []
        ruin_count = 0

        for _ in range(self.n_sims):
            # Bootstrap: resample trades with replacement
            shuffled = np.random.choice(r, size=len(r), replace=True)

            # Compute equity curve
            equity = initial_capital * np.cumprod(1 + shuffled)

            # Track peak for drawdown
            peak = initial_capital
            max_dd = 0.0
            ruined = False

            for val in equity:
                if val > peak:
                    peak = val
                dd = (peak - val) / peak
                if dd > max_dd:
                    max_dd = dd
                if dd >= ruin_threshold:
                    ruined = True
                    break

            final_values.append(equity[-1] if not ruined else 0)
            max_drawdowns.append(max_dd)
            if ruined:
                ruin_count += 1

        fv = np.array(final_values)
        mdd = np.array(max_drawdowns)
        ci_lo = (1 - self.confidence) / 2
        ci_hi = 1 - ci_lo

        return {
            'n_simulations': self.n_sims,
            'original_trades': len(trade_returns),
            'final_value': {
                'mean': np.mean(fv),
                'median': np.median(fv),
                f'p{int(ci_lo*100)}': np.percentile(fv, ci_lo * 100),
                f'p{int(ci_hi*100)}': np.percentile(fv, ci_hi * 100),
                'pct_profitable': (fv > initial_capital).mean()
            },
            'max_drawdown': {
                'mean': np.mean(mdd),
                'median': np.median(mdd),
                f'p{int(self.confidence*100)}': np.percentile(mdd, self.confidence * 100),
                'worst_case': np.max(mdd)
            },
            'ruin_probability': ruin_count / self.n_sims,
            'recommendation': 'PASS' if ruin_count / self.n_sims < 0.10 else 'FAIL'
        }

Interpretation: A robust strategy should have a ruin probability below 10% and a median final value above initial capital in Monte Carlo simulations. Strategies that only barely pass under the original trade sequence should be treated with skepticism.

8. Overfitting Prevention

Overfitting is the silent killer of algorithmic trading strategies. A strategy with 20 parameters optimized on 2 years of daily data (500 bars) has more degrees of freedom than data points — it can almost certainly be fit to produce spectacular historical returns that will not persist out-of-sample.

Rules for Preventing Overfitting

  • Rule of thumb: Need at least 30-100 samples per free parameter optimized
  • Reserve holdout data: Never touch the test set until final strategy selection
  • Limit parameter count: Prefer strategies with 1-3 parameters over those with 10+
  • Penalize complexity: Use adjusted Sharpe or penalized metrics during optimization
  • Test on different regimes: Strategy should work in bull, bear, and sideways markets
  • Walk-forward validation: The best defense against overfitting (see Section 6)

Deflated Sharpe Ratio

The Deflated Sharpe Ratio (DSR) adjusts the observed Sharpe for the number of trials tested, providing a probability that the strategy has true positive Sharpe:

DSR = P[SR > SR_benchmark | N_trials, T, skewness, kurtosis]
from scipy import stats

def deflated_sharpe_ratio(
    observed_sharpe: float,
    n_trials: int,
    T: int,
    skewness: float = 0.0,
    excess_kurtosis: float = 0.0
) -> float:
    """
    Compute the Deflated Sharpe Ratio (Lopez de Prado, 2018).

    The DSR estimates the probability that a strategy's Sharpe Ratio
    is truly positive, accounting for selection bias from testing
    multiple parameter combinations.

    Args:
        observed_sharpe: Annualized Sharpe ratio of best strategy
        n_trials: Total number of parameter combinations tested
        T: Number of observations (bars) in backtest
        skewness: Return distribution skewness (0 = normal)
        excess_kurtosis: Return distribution excess kurtosis (0 = normal)

    Returns:
        Probability (0-1) that true Sharpe > 0
    """
    # Expected maximum Sharpe from n_trials random strategies
    # Using the formula from de Prado (2018)
    euler_mascheroni = 0.5772156649
    expected_max_sr = (
        (1 - euler_mascheroni) * stats.norm.ppf(1 - 1 / n_trials)
        + euler_mascheroni * stats.norm.ppf(1 - 1 / (n_trials * np.e))
    )

    # Variance correction for non-normal returns
    sr_std = np.sqrt(
        (1 + (0.5 * observed_sharpe**2) - (skewness * observed_sharpe)
         + ((excess_kurtosis - 1) / 4) * observed_sharpe**2) / T
    )

    # DSR: probability that observed_sharpe > expected_max under null
    z_score = (observed_sharpe - expected_max_sr) / sr_std
    return stats.norm.cdf(z_score)


# Example: 50 parameter combos tested, 250 bars, observed Sharpe = 1.5
dsr = deflated_sharpe_ratio(
    observed_sharpe=1.5,
    n_trials=50,
    T=250
)
print(f"Deflated Sharpe Probability: {dsr:.2%}")
# If this is below 0.95, the strategy likely overfits

9. Complete Python Backtesting Framework

The following integrates all components into a single pipeline that agents can use to validate strategies end-to-end before deployment on Purple Flea:

class AgentBacktestPipeline:
    """
    End-to-end backtesting pipeline for AI agents.
    Runs full validation: data quality, vectorized backtest,
    walk-forward optimization, and Monte Carlo stress test.
    """

    def __init__(self, initial_capital: float = 10_000):
        self.capital = initial_capital
        self.data_checker = DataQualityChecker()
        self.cost_model = TransactionCostModel()
        self.slippage_model = SlippageModel()
        self.wfo = WalkForwardOptimizer()
        self.mc = MonteCarloSimulator(n_simulations=1000)
        self.backtester = VectorizedBacktester(
            commission_rate=0.0005,
            slippage_rate=0.0003,
            initial_capital=initial_capital
        )

    def validate(
        self,
        prices: pd.Series,
        strategy_fn: callable,
        param_grid: dict,
        strategy_params: dict
    ) -> dict:
        """
        Full validation pipeline. Returns GO/NO-GO recommendation.

        Args:
            prices: Historical price series
            strategy_fn: Function(prices, **params) -> signals Series
            param_grid: Parameter search space for WFO
            strategy_params: Final parameters for full backtest

        Returns:
            Comprehensive validation report
        """
        report = {'stages': {}, 'recommendation': 'PENDING'}

        # Stage 1: Data Quality
        price_df = pd.DataFrame({'close': prices})
        dq = self.data_checker.check(price_df)
        report['stages']['data_quality'] = dq
        if not dq['passed']:
            report['recommendation'] = 'NO-GO: Data quality issues'
            return report

        # Stage 2: Full Backtest
        signals = strategy_fn(prices, **strategy_params)
        bt_result = self.backtester.run(prices, signals)
        report['stages']['backtest'] = {
            'total_return': bt_result.total_return,
            'sharpe_ratio': bt_result.sharpe_ratio,
            'max_drawdown': bt_result.max_drawdown,
            'win_rate': bt_result.win_rate,
            'profit_factor': bt_result.profit_factor,
            'total_trades': bt_result.total_trades
        }

        if bt_result.sharpe_ratio < 0.5 or bt_result.max_drawdown < -0.40:
            report['recommendation'] = 'NO-GO: Poor in-sample performance'
            return report

        # Stage 3: Walk-Forward Optimization
        def wfo_strategy(p, **params):
            sigs = strategy_fn(p, **params)
            return self.backtester.run(p, sigs)

        wfo_result = self.wfo.run(prices, param_grid, wfo_strategy)
        report['stages']['walk_forward'] = wfo_result
        if wfo_result.get('recommendation') == 'FAIL':
            report['recommendation'] = 'NO-GO: Failed walk-forward validation'
            return report

        # Stage 4: Monte Carlo
        # Use equity curve returns as trade proxy
        equity = bt_result.equity_curve
        trade_returns = equity.pct_change().dropna().tolist()
        mc_result = self.mc.simulate(trade_returns, self.capital)
        report['stages']['monte_carlo'] = mc_result
        if mc_result.get('ruin_probability', 1.0) > 0.10:
            report['recommendation'] = f"NO-GO: Ruin probability {mc_result['ruin_probability']:.0%} > 10%"
            return report

        # All stages passed
        report['recommendation'] = 'GO: Strategy validated for paper trading'
        report['suggested_kelly_fraction'] = 0.25  # Start conservative
        return report


# Example usage
if __name__ == '__main__':
    pipeline = AgentBacktestPipeline(initial_capital=10_000)

    # Synthetic prices for demonstration
    np.random.seed(42)
    dates = pd.date_range('2023-01-01', periods=500, freq='D')
    prices = pd.Series(
        100 * np.exp(np.cumsum(np.random.normal(0.0003, 0.02, 500))),
        index=dates, name='BTC'
    )

    def simple_momentum(prices, lookback=20, **kwargs):
        """Simple momentum: long when price > SMA, flat otherwise."""
        sma = prices.rolling(lookback).mean()
        return (prices > sma).astype(int)

    report = pipeline.validate(
        prices=prices,
        strategy_fn=simple_momentum,
        param_grid={'lookback': [10, 15, 20, 25, 30]},
        strategy_params={'lookback': 20}
    )

    print(f"Recommendation: {report['recommendation']}")
    print(f"Backtest Sharpe: {report['stages']['backtest']['sharpe_ratio']:.2f}")
    print(f"WFO Avg OOS Sharpe: {report['stages']['walk_forward'].get('avg_oos_sharpe', 0):.2f}")
    print(f"MC Ruin Probability: {report['stages']['monte_carlo'].get('ruin_probability', 1):.1%}")

Deploy Validated Strategies on Purple Flea

Once your strategy passes backtesting, deploy it live. Use the faucet for risk-free initial capital, then graduate to full live trading on our perpetuals and casino products.

Get Free Capital Trading API Docs