1. Backtesting Methodology
Backtesting applies a trading strategy to historical data to evaluate how it would have performed. For autonomous AI agents, backtesting serves as the primary quality gate before live deployment — a strategy that cannot demonstrate profitability on historical data should never touch real capital.
The fundamental challenge is that backtesting is inherently retrospective. The agent knows what happened, which introduces look-ahead bias if not carefully controlled. A rigorous backtest must simulate the exact information state available at each decision point, including data latency, confirmation delays, and execution timing.
Core principle: A backtest is only as good as the assumptions it encodes about execution. Unrealistic execution assumptions (zero slippage, instant fills, no partial fills) inflate performance metrics and create false confidence in strategies that will underperform live.
What a Backtest Must Simulate
- Market data availability: Only data available at decision time, not future bars
- Order types and fills: Market orders vs limit orders, partial fills, order expiry
- Transaction costs: Commissions, spread, funding rates, withdrawal fees
- Slippage: Price impact of executing at market, especially for larger orders
- Capital constraints: Margin requirements, leverage limits, minimum order sizes
- Portfolio constraints: Max positions, concentration limits (see risk limits guide)
Backtest vs Forward Test vs Live
| Phase | Data | Risk | Purpose |
|---|---|---|---|
| In-sample backtest | Historical (training) | Zero | Strategy development |
| Out-of-sample backtest | Historical (holdout) | Zero | Initial validation |
| Walk-forward test | Rolling historical windows | Zero | Robustness validation |
| Paper trade | Live (simulated execution) | Zero | Pre-live validation |
| Live small | Live (real execution) | Minimal | Real-world calibration |
| Live full | Live (real execution) | Full | Strategy deployment |
Agents should pass all phases before reaching live full. The Purple Flea faucet provides capital for the "live small" phase, bridging the gap between paper trading and full live deployment.
2. Vectorized vs Event-Driven Backtesting
Two architectures dominate backtesting implementations, each with different tradeoffs.
Vectorized Backtesting
Vectorized backtesting applies the entire strategy to the full dataset at once using array operations (NumPy/Pandas). It is extremely fast — a full year of minute-bar data can be processed in seconds. The tradeoff is that it cannot simulate complex execution logic, order book dynamics, or multi-asset interdependencies.
Best for: Simple strategies with fixed entry/exit rules, EMA crossovers, momentum signals, or binary outcome strategies where execution timing within a bar does not matter significantly.
import pandas as pd
import numpy as np
from dataclasses import dataclass
@dataclass
class BacktestResult:
total_return: float
annualized_return: float
sharpe_ratio: float
max_drawdown: float
win_rate: float
profit_factor: float
total_trades: int
equity_curve: pd.Series
class VectorizedBacktester:
"""
Fast vectorized backtester for single-asset strategies.
Processes entire dataset in a single pass using NumPy/Pandas.
"""
def __init__(
self,
commission_rate: float = 0.001, # 0.1% per trade
slippage_rate: float = 0.0005, # 0.05% slippage
initial_capital: float = 10_000.0
):
self.commission = commission_rate
self.slippage = slippage_rate
self.initial_capital = initial_capital
def run(self, prices: pd.Series, signals: pd.Series) -> BacktestResult:
"""
Run vectorized backtest.
Args:
prices: OHLCV close prices indexed by datetime
signals: Position signals (-1, 0, 1) for each bar
Returns:
BacktestResult with performance metrics
"""
# Position changes determine when trades occur
position_changes = signals.diff().fillna(0)
trades = position_changes[position_changes != 0]
# Returns: price return * signal (shifted to avoid look-ahead)
price_returns = prices.pct_change().shift(-1) # Next bar returns
strategy_returns = signals * price_returns
# Apply transaction costs on position changes
cost_per_change = (self.commission + self.slippage) * position_changes.abs()
net_returns = strategy_returns - cost_per_change
# Equity curve
equity = (1 + net_returns).cumprod() * self.initial_capital
equity = equity.fillna(method='ffill').fillna(self.initial_capital)
# Metrics
total_return = (equity.iloc[-1] / equity.iloc[0]) - 1
n_years = len(equity) / 252
annualized = (1 + total_return) ** (1 / max(n_years, 0.01)) - 1
daily_returns = equity.pct_change().dropna()
sharpe = (daily_returns.mean() / daily_returns.std()) * np.sqrt(252) if daily_returns.std() > 0 else 0
rolling_max = equity.cummax()
drawdown = (equity - rolling_max) / rolling_max
max_dd = drawdown.min()
# Trade-level stats
trade_returns = []
position = 0
entry_price = 0.0
for i, (ts, px) in enumerate(prices.items()):
sig = signals.get(ts, 0)
if sig != position:
if position != 0 and entry_price > 0:
ret = (px - entry_price) / entry_price * position
trade_returns.append(ret)
position = sig
entry_price = px
wins = [r for r in trade_returns if r > 0]
losses = [r for r in trade_returns if r <= 0]
win_rate = len(wins) / len(trade_returns) if trade_returns else 0
gross_profit = sum(wins)
gross_loss = abs(sum(losses))
profit_factor = gross_profit / gross_loss if gross_loss > 0 else float('inf')
return BacktestResult(
total_return=total_return,
annualized_return=annualized,
sharpe_ratio=sharpe,
max_drawdown=max_dd,
win_rate=win_rate,
profit_factor=profit_factor,
total_trades=len(trade_returns),
equity_curve=equity
)
Event-Driven Backtesting
Event-driven backtesting processes data bar-by-bar, triggering event handlers as each new bar arrives. It is slower than vectorized but accurately simulates order queuing, partial fills, multi-asset rebalancing, and latency. For production agents, event-driven backtests provide more realistic performance estimates.
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable
import heapq
class EventType(Enum):
MARKET_DATA = 'market_data'
SIGNAL = 'signal'
ORDER = 'order'
FILL = 'fill'
@dataclass
class Event:
timestamp: pd.Timestamp
event_type: EventType
data: dict
def __lt__(self, other):
return self.timestamp < other.timestamp
class EventDrivenBacktester:
"""
Event-driven backtester with realistic order simulation.
Processes events in chronological order for accurate sequencing.
"""
def __init__(self, initial_capital: float = 10_000):
self.capital = initial_capital
self.positions: dict[str, float] = {}
self.events: list = []
self.handlers: dict[EventType, list[Callable]] = {e: [] for e in EventType}
self.equity_history: list[tuple] = []
def subscribe(self, event_type: EventType, handler: Callable) -> None:
"""Register an event handler."""
self.handlers[event_type].append(handler)
def emit(self, event: Event) -> None:
"""Add event to the priority queue."""
heapq.heappush(self.events, event)
def run(self, market_data: pd.DataFrame) -> dict:
"""
Process all market data events in sequence.
market_data: DataFrame with columns [open, high, low, close, volume]
"""
# Load all market data into event queue
for ts, row in market_data.iterrows():
self.emit(Event(
timestamp=ts,
event_type=EventType.MARKET_DATA,
data=row.to_dict()
))
# Process event loop
while self.events:
event = heapq.heappop(self.events)
for handler in self.handlers[event.event_type]:
new_events = handler(event, self)
if new_events:
for ne in new_events:
self.emit(ne)
# Record equity after each market data event
if event.event_type == EventType.MARKET_DATA:
position_value = sum(
qty * event.data.get('close', 0)
for sym, qty in self.positions.items()
)
self.equity_history.append((event.timestamp, self.capital + position_value))
return {'equity': pd.Series(
dict(self.equity_history), name='equity'
)}
3. Data Quality and Survivorship Bias
Garbage data produces garbage backtests. Before running any strategy validation, the agent must audit data quality across several dimensions.
Common Data Quality Issues
- Survivorship bias: Datasets containing only assets that survived (e.g., index constituents today) overstate historical returns because failed assets are excluded
- Look-ahead bias: Using data in decisions that would not have been available at that time (e.g., end-of-day close for intraday decisions)
- Adjusted vs unadjusted prices: Dividend and split adjustments retroactively change historical prices — correct for these consistently
- Stale prices: Illiquid assets may have gaps or stale quotes that create phantom opportunities
- Timezone inconsistencies: Mixing UTC and local timestamps creates incorrect sequencing
Survivorship bias warning: A simple mean-reversion strategy on S&P 500 constituents that uses today's constituent list to backtest 10 years will include companies that only joined the index recently — missing the ones that failed. This inflates backtest returns by 1-3% annually.
class DataQualityChecker:
"""Automated data quality validation before backtesting."""
def __init__(self, max_gap_bars: int = 5, max_price_jump_pct: float = 0.20):
self.max_gap = max_gap_bars
self.max_jump = max_price_jump_pct
def check(self, df: pd.DataFrame) -> dict:
"""
Run full data quality audit.
Returns:
Dict with 'passed' bool and list of 'issues'
"""
issues = []
# 1. Missing values
null_counts = df.isnull().sum()
if null_counts.any():
issues.append(f"Missing values: {null_counts[null_counts > 0].to_dict()}")
# 2. Price continuity (detect gaps)
if 'close' in df.columns:
pct_changes = df['close'].pct_change().abs()
spikes = pct_changes[pct_changes > self.max_jump]
if not spikes.empty:
issues.append(f"Price spikes > {self.max_jump:.0%} at: {spikes.index.tolist()[:5]}")
# 3. Temporal gaps (missing bars)
if isinstance(df.index, pd.DatetimeIndex) and len(df) > 1:
expected_freq = df.index.to_series().diff().mode()[0]
actual_diffs = df.index.to_series().diff()
large_gaps = actual_diffs[actual_diffs > expected_freq * self.max_gap]
if not large_gaps.empty:
issues.append(f"Temporal gaps > {self.max_gap} bars at: {large_gaps.index.tolist()[:5]}")
# 4. Zero/negative prices
if 'close' in df.columns:
invalid = df['close'][df['close'] <= 0]
if not invalid.empty:
issues.append(f"Zero/negative prices at: {invalid.index.tolist()[:5]}")
# 5. Volume consistency
if 'volume' in df.columns:
zero_vol = df['volume'][df['volume'] == 0]
if len(zero_vol) > len(df) * 0.05: # > 5% zero volume days
issues.append(f"Excessive zero-volume bars: {len(zero_vol)} ({len(zero_vol)/len(df):.1%})")
return {
'passed': len(issues) == 0,
'issues': issues,
'rows': len(df),
'date_range': f"{df.index[0]} to {df.index[-1]}" if len(df) > 0 else 'empty'
}
4. Transaction Cost Modeling
Transaction costs are the single biggest cause of the gap between backtest performance and live performance. A strategy showing 30% annual return in backtest with zero-cost assumptions may show 10% or even negative returns live when costs are properly accounted for.
Purple Flea Fee Structure
| Product | Fee Type | Rate | Notes |
|---|---|---|---|
| Perpetuals (maker) | Per trade | 0.02% | Limit orders that add liquidity |
| Perpetuals (taker) | Per trade | 0.05% | Market orders that remove liquidity |
| Casino | House edge | 1-3% | Per game, varies by game type |
| Escrow | Settlement fee | 1% | On escrow completion |
| Funding rate | Per 8h | Variable | Long pays short when positive |
For high-frequency strategies that trade many times per day, even 0.05% per trade compounds to enormous costs. A strategy making 10 round trips per day incurs 1% per day in taker fees alone — roughly 250% annually just in costs. Only strategies with very high edge can survive this cost burden.
@dataclass
class TransactionCostModel:
"""Model all transaction costs for accurate backtesting."""
maker_fee: float = 0.0002 # 0.02%
taker_fee: float = 0.0005 # 0.05%
slippage_rate: float = 0.0003 # 0.03% base slippage
funding_rate_8h: float = 0.0001 # Variable, use historical average
def cost_of_trade(
self,
notional: float,
is_maker: bool = False,
hold_bars_8h: int = 1,
is_long: bool = True
) -> dict:
"""
Compute total cost of entering and exiting a position.
Args:
notional: Position size in USD
is_maker: True if using limit orders (maker fee)
hold_bars_8h: Number of 8h funding periods held
is_long: True for long (pays positive funding)
Returns:
Dict with itemized costs
"""
entry_fee_rate = self.maker_fee if is_maker else self.taker_fee
exit_fee_rate = self.maker_fee if is_maker else self.taker_fee
entry_fee = notional * (entry_fee_rate + self.slippage_rate)
exit_fee = notional * (exit_fee_rate + self.slippage_rate)
# Funding only paid if long (or short when rate negative)
funding_sign = 1 if is_long else -1
funding_cost = notional * self.funding_rate_8h * hold_bars_8h * funding_sign
total = entry_fee + exit_fee + max(0, funding_cost)
return {
'entry_fee': entry_fee,
'exit_fee': exit_fee,
'funding': funding_cost,
'total': total,
'total_pct': total / notional,
'breakeven_return': total / notional # Minimum return to cover costs
}
def min_edge_required(
self,
daily_trades: int,
avg_hold_hours: float = 8,
is_maker: bool = False
) -> float:
"""
Compute minimum daily strategy edge required to be profitable.
Returns:
Required daily return as decimal (e.g., 0.01 = 1%)
"""
cost_per_trade = (self.maker_fee if is_maker else self.taker_fee) + self.slippage_rate
round_trip_cost = cost_per_trade * 2 # Enter + exit
funding_per_trade = self.funding_rate_8h * (avg_hold_hours / 8)
total_daily_cost = daily_trades * (round_trip_cost + funding_per_trade)
return total_daily_cost
# Evaluate a high-frequency strategy
cm = TransactionCostModel()
edge_needed = cm.min_edge_required(daily_trades=20, avg_hold_hours=1, is_maker=False)
print(f"Minimum daily return to break even: {edge_needed:.2%}")
# With 20 trades/day at taker rates: need ~1.6% daily just to cover costs
5. Slippage Estimation
Slippage is the difference between the expected fill price and the actual fill price. For small orders on liquid instruments, slippage is negligible. For larger orders or illiquid instruments, slippage can be the dominant cost.
Slippage Models
- Fixed slippage: Add a constant bps regardless of order size (simplest, least accurate)
- Percentage slippage: Proportional to order size relative to average volume
- Square-root impact model: Industry standard for estimating market impact
Where σ is daily volatility, Q is order quantity, and ADV is average daily volume. This square-root model is consistent with empirical market microstructure research and used by institutional traders for cost estimation.
class SlippageModel:
"""
Realistic slippage estimation using square-root market impact model.
Used in backtesting to simulate execution costs.
"""
def __init__(
self,
daily_vol: float = 0.02, # Asset daily volatility
adv_usd: float = 1_000_000, # Average daily volume in USD
participation_limit: float = 0.10 # Max 10% of ADV per order
):
self.daily_vol = daily_vol
self.adv = adv_usd
self.participation_limit = participation_limit
def estimate(self, order_size_usd: float) -> dict:
"""
Estimate slippage for a given order size.
Returns:
Dict with slippage as fraction and USD amount
"""
# Participation rate (what fraction of daily volume this order represents)
participation = order_size_usd / self.adv
if participation > self.participation_limit:
# Order too large — will have severe market impact
return {
'feasible': False,
'reason': f"Order ({participation:.1%} of ADV) exceeds {self.participation_limit:.0%} participation limit",
'max_order_usd': self.adv * self.participation_limit
}
# Square-root impact model
slippage_pct = self.daily_vol * np.sqrt(participation)
# Add bid-ask spread component (assume half-spread = 0.5 * daily_vol * 0.1)
half_spread = self.daily_vol * 0.05
total_slippage = slippage_pct + half_spread
return {
'feasible': True,
'participation': participation,
'market_impact_pct': slippage_pct,
'half_spread_pct': half_spread,
'total_slippage_pct': total_slippage,
'total_slippage_usd': total_slippage * order_size_usd
}
def adjust_fill_price(self, quoted_price: float, order_size_usd: float, is_buy: bool) -> float:
"""Apply slippage to get simulated fill price."""
slip = self.estimate(order_size_usd)
if not slip.get('feasible'):
return quoted_price # Can't size this order
direction = 1 if is_buy else -1
return quoted_price * (1 + direction * slip['total_slippage_pct'])
6. Walk-Forward Optimization
Walk-forward optimization (WFO) is the gold standard for validating that a strategy's parameters are robust and not overfitted to a specific historical period. It works by repeatedly optimizing parameters on an in-sample window, then testing the optimal parameters on the immediately following out-of-sample window.
WFO Procedure
- Select in-sample window size (e.g., 6 months) and out-of-sample size (e.g., 2 months)
- Optimize parameters on the first in-sample window
- Apply optimal parameters to the next out-of-sample window, record performance
- Advance both windows by the out-of-sample period
- Repeat until dataset is exhausted
- Concatenate all out-of-sample windows to form the WFO equity curve
A strategy that performs well in walk-forward testing demonstrates parameter stability — its optimal parameters do not radically change from window to window, indicating the strategy has genuine edge rather than curve-fit noise.
from itertools import product
from typing import Any
class WalkForwardOptimizer:
"""
Walk-forward optimization for strategy parameter validation.
Prevents overfitting by testing on truly out-of-sample data.
"""
def __init__(
self,
in_sample_bars: int = 120, # ~6 months of daily data
out_of_sample_bars: int = 40, # ~2 months
min_trades: int = 20 # Minimum trades to count a window
):
self.is_bars = in_sample_bars
self.oos_bars = out_of_sample_bars
self.min_trades = min_trades
def run(
self,
prices: pd.Series,
param_grid: dict[str, list],
strategy_fn: callable,
objective: str = 'sharpe_ratio'
) -> dict:
"""
Run walk-forward optimization.
Args:
prices: Historical price series
param_grid: Dict mapping param names to candidate values
strategy_fn: Function(prices, **params) -> BacktestResult
objective: Metric to maximize in in-sample period
Returns:
WFO results with combined OOS performance
"""
total_bars = len(prices)
window_start = 0
oos_results = []
optimal_params_history = []
while window_start + self.is_bars + self.oos_bars <= total_bars:
is_end = window_start + self.is_bars
oos_end = is_end + self.oos_bars
is_prices = prices.iloc[window_start:is_end]
oos_prices = prices.iloc[is_end:oos_end]
# Grid search on in-sample data
best_score = -float('inf')
best_params = {}
param_names = list(param_grid.keys())
param_values = list(param_grid.values())
for combo in product(*param_values):
params = dict(zip(param_names, combo))
try:
result = strategy_fn(is_prices, **params)
score = getattr(result, objective, -float('inf'))
if score > best_score and result.total_trades >= self.min_trades:
best_score = score
best_params = params
except Exception:
continue
if best_params:
# Apply best params to OOS window
oos_result = strategy_fn(oos_prices, **best_params)
oos_results.append({
'window_start': prices.index[window_start],
'window_is_end': prices.index[is_end - 1],
'window_oos_end': prices.index[oos_end - 1],
'optimal_params': best_params,
'is_score': best_score,
'oos_sharpe': oos_result.sharpe_ratio,
'oos_return': oos_result.total_return,
'oos_max_dd': oos_result.max_drawdown
})
optimal_params_history.append(best_params)
window_start += self.oos_bars
if not oos_results:
return {'error': 'No complete windows found'}
# Aggregate OOS statistics
oos_sharpes = [r['oos_sharpe'] for r in oos_results]
oos_returns = [r['oos_return'] for r in oos_results]
positive_windows = sum(1 for r in oos_returns if r > 0)
return {
'windows': oos_results,
'avg_oos_sharpe': np.mean(oos_sharpes),
'median_oos_sharpe': np.median(oos_sharpes),
'avg_oos_return': np.mean(oos_returns),
'pct_positive_windows': positive_windows / len(oos_results),
'param_stability': self._param_stability(optimal_params_history),
'recommendation': 'PASS' if np.mean(oos_sharpes) > 0.5 else 'FAIL'
}
def _param_stability(self, history: list[dict]) -> dict:
"""Measure how stable optimal parameters are across windows."""
if not history:
return {}
stability = {}
for param in history[0].keys():
values = [h[param] for h in history]
stability[param] = {
'mean': np.mean(values),
'std': np.std(values),
'cv': np.std(values) / np.mean(values) if np.mean(values) != 0 else float('inf')
}
return stability
7. Monte Carlo Simulation for Robustness
Monte Carlo simulation tests strategy robustness by randomly shuffling the sequence of historical trades and computing the distribution of outcomes. If the strategy's edge is real, the performance should be roughly preserved across random orderings. If the original equity curve's performance largely disappears when randomized, the original order of trades was unusually lucky.
What Monte Carlo Tests
- Luck vs edge: How likely is the backtest performance given random trade ordering?
- Drawdown distribution: What is the 95th percentile maximum drawdown?
- Time to recovery: How long does it typically take to recover from drawdowns?
- Ruin probability: What fraction of simulations reach a 50% drawdown?
class MonteCarloSimulator:
"""
Monte Carlo simulation for strategy robustness testing.
Bootstraps trade returns to generate outcome distribution.
"""
def __init__(self, n_simulations: int = 1000, confidence_level: float = 0.95):
self.n_sims = n_simulations
self.confidence = confidence_level
def simulate(
self,
trade_returns: list[float],
initial_capital: float = 10_000,
ruin_threshold: float = 0.50
) -> dict:
"""
Run Monte Carlo simulation by bootstrapping trade returns.
Args:
trade_returns: List of per-trade P&L as decimal returns
initial_capital: Starting capital for each simulation
ruin_threshold: Drawdown level considered "ruin"
Returns:
Statistical summary of simulated outcomes
"""
if len(trade_returns) < 10:
return {'error': 'Insufficient trade history (need >= 10 trades)'}
r = np.array(trade_returns)
final_values = []
max_drawdowns = []
ruin_count = 0
for _ in range(self.n_sims):
# Bootstrap: resample trades with replacement
shuffled = np.random.choice(r, size=len(r), replace=True)
# Compute equity curve
equity = initial_capital * np.cumprod(1 + shuffled)
# Track peak for drawdown
peak = initial_capital
max_dd = 0.0
ruined = False
for val in equity:
if val > peak:
peak = val
dd = (peak - val) / peak
if dd > max_dd:
max_dd = dd
if dd >= ruin_threshold:
ruined = True
break
final_values.append(equity[-1] if not ruined else 0)
max_drawdowns.append(max_dd)
if ruined:
ruin_count += 1
fv = np.array(final_values)
mdd = np.array(max_drawdowns)
ci_lo = (1 - self.confidence) / 2
ci_hi = 1 - ci_lo
return {
'n_simulations': self.n_sims,
'original_trades': len(trade_returns),
'final_value': {
'mean': np.mean(fv),
'median': np.median(fv),
f'p{int(ci_lo*100)}': np.percentile(fv, ci_lo * 100),
f'p{int(ci_hi*100)}': np.percentile(fv, ci_hi * 100),
'pct_profitable': (fv > initial_capital).mean()
},
'max_drawdown': {
'mean': np.mean(mdd),
'median': np.median(mdd),
f'p{int(self.confidence*100)}': np.percentile(mdd, self.confidence * 100),
'worst_case': np.max(mdd)
},
'ruin_probability': ruin_count / self.n_sims,
'recommendation': 'PASS' if ruin_count / self.n_sims < 0.10 else 'FAIL'
}
Interpretation: A robust strategy should have a ruin probability below 10% and a median final value above initial capital in Monte Carlo simulations. Strategies that only barely pass under the original trade sequence should be treated with skepticism.
8. Overfitting Prevention
Overfitting is the silent killer of algorithmic trading strategies. A strategy with 20 parameters optimized on 2 years of daily data (500 bars) has more degrees of freedom than data points — it can almost certainly be fit to produce spectacular historical returns that will not persist out-of-sample.
Rules for Preventing Overfitting
- Rule of thumb: Need at least 30-100 samples per free parameter optimized
- Reserve holdout data: Never touch the test set until final strategy selection
- Limit parameter count: Prefer strategies with 1-3 parameters over those with 10+
- Penalize complexity: Use adjusted Sharpe or penalized metrics during optimization
- Test on different regimes: Strategy should work in bull, bear, and sideways markets
- Walk-forward validation: The best defense against overfitting (see Section 6)
Deflated Sharpe Ratio
The Deflated Sharpe Ratio (DSR) adjusts the observed Sharpe for the number of trials tested, providing a probability that the strategy has true positive Sharpe:
from scipy import stats
def deflated_sharpe_ratio(
observed_sharpe: float,
n_trials: int,
T: int,
skewness: float = 0.0,
excess_kurtosis: float = 0.0
) -> float:
"""
Compute the Deflated Sharpe Ratio (Lopez de Prado, 2018).
The DSR estimates the probability that a strategy's Sharpe Ratio
is truly positive, accounting for selection bias from testing
multiple parameter combinations.
Args:
observed_sharpe: Annualized Sharpe ratio of best strategy
n_trials: Total number of parameter combinations tested
T: Number of observations (bars) in backtest
skewness: Return distribution skewness (0 = normal)
excess_kurtosis: Return distribution excess kurtosis (0 = normal)
Returns:
Probability (0-1) that true Sharpe > 0
"""
# Expected maximum Sharpe from n_trials random strategies
# Using the formula from de Prado (2018)
euler_mascheroni = 0.5772156649
expected_max_sr = (
(1 - euler_mascheroni) * stats.norm.ppf(1 - 1 / n_trials)
+ euler_mascheroni * stats.norm.ppf(1 - 1 / (n_trials * np.e))
)
# Variance correction for non-normal returns
sr_std = np.sqrt(
(1 + (0.5 * observed_sharpe**2) - (skewness * observed_sharpe)
+ ((excess_kurtosis - 1) / 4) * observed_sharpe**2) / T
)
# DSR: probability that observed_sharpe > expected_max under null
z_score = (observed_sharpe - expected_max_sr) / sr_std
return stats.norm.cdf(z_score)
# Example: 50 parameter combos tested, 250 bars, observed Sharpe = 1.5
dsr = deflated_sharpe_ratio(
observed_sharpe=1.5,
n_trials=50,
T=250
)
print(f"Deflated Sharpe Probability: {dsr:.2%}")
# If this is below 0.95, the strategy likely overfits
9. Complete Python Backtesting Framework
The following integrates all components into a single pipeline that agents can use to validate strategies end-to-end before deployment on Purple Flea:
class AgentBacktestPipeline:
"""
End-to-end backtesting pipeline for AI agents.
Runs full validation: data quality, vectorized backtest,
walk-forward optimization, and Monte Carlo stress test.
"""
def __init__(self, initial_capital: float = 10_000):
self.capital = initial_capital
self.data_checker = DataQualityChecker()
self.cost_model = TransactionCostModel()
self.slippage_model = SlippageModel()
self.wfo = WalkForwardOptimizer()
self.mc = MonteCarloSimulator(n_simulations=1000)
self.backtester = VectorizedBacktester(
commission_rate=0.0005,
slippage_rate=0.0003,
initial_capital=initial_capital
)
def validate(
self,
prices: pd.Series,
strategy_fn: callable,
param_grid: dict,
strategy_params: dict
) -> dict:
"""
Full validation pipeline. Returns GO/NO-GO recommendation.
Args:
prices: Historical price series
strategy_fn: Function(prices, **params) -> signals Series
param_grid: Parameter search space for WFO
strategy_params: Final parameters for full backtest
Returns:
Comprehensive validation report
"""
report = {'stages': {}, 'recommendation': 'PENDING'}
# Stage 1: Data Quality
price_df = pd.DataFrame({'close': prices})
dq = self.data_checker.check(price_df)
report['stages']['data_quality'] = dq
if not dq['passed']:
report['recommendation'] = 'NO-GO: Data quality issues'
return report
# Stage 2: Full Backtest
signals = strategy_fn(prices, **strategy_params)
bt_result = self.backtester.run(prices, signals)
report['stages']['backtest'] = {
'total_return': bt_result.total_return,
'sharpe_ratio': bt_result.sharpe_ratio,
'max_drawdown': bt_result.max_drawdown,
'win_rate': bt_result.win_rate,
'profit_factor': bt_result.profit_factor,
'total_trades': bt_result.total_trades
}
if bt_result.sharpe_ratio < 0.5 or bt_result.max_drawdown < -0.40:
report['recommendation'] = 'NO-GO: Poor in-sample performance'
return report
# Stage 3: Walk-Forward Optimization
def wfo_strategy(p, **params):
sigs = strategy_fn(p, **params)
return self.backtester.run(p, sigs)
wfo_result = self.wfo.run(prices, param_grid, wfo_strategy)
report['stages']['walk_forward'] = wfo_result
if wfo_result.get('recommendation') == 'FAIL':
report['recommendation'] = 'NO-GO: Failed walk-forward validation'
return report
# Stage 4: Monte Carlo
# Use equity curve returns as trade proxy
equity = bt_result.equity_curve
trade_returns = equity.pct_change().dropna().tolist()
mc_result = self.mc.simulate(trade_returns, self.capital)
report['stages']['monte_carlo'] = mc_result
if mc_result.get('ruin_probability', 1.0) > 0.10:
report['recommendation'] = f"NO-GO: Ruin probability {mc_result['ruin_probability']:.0%} > 10%"
return report
# All stages passed
report['recommendation'] = 'GO: Strategy validated for paper trading'
report['suggested_kelly_fraction'] = 0.25 # Start conservative
return report
# Example usage
if __name__ == '__main__':
pipeline = AgentBacktestPipeline(initial_capital=10_000)
# Synthetic prices for demonstration
np.random.seed(42)
dates = pd.date_range('2023-01-01', periods=500, freq='D')
prices = pd.Series(
100 * np.exp(np.cumsum(np.random.normal(0.0003, 0.02, 500))),
index=dates, name='BTC'
)
def simple_momentum(prices, lookback=20, **kwargs):
"""Simple momentum: long when price > SMA, flat otherwise."""
sma = prices.rolling(lookback).mean()
return (prices > sma).astype(int)
report = pipeline.validate(
prices=prices,
strategy_fn=simple_momentum,
param_grid={'lookback': [10, 15, 20, 25, 30]},
strategy_params={'lookback': 20}
)
print(f"Recommendation: {report['recommendation']}")
print(f"Backtest Sharpe: {report['stages']['backtest']['sharpe_ratio']:.2f}")
print(f"WFO Avg OOS Sharpe: {report['stages']['walk_forward'].get('avg_oos_sharpe', 0):.2f}")
print(f"MC Ruin Probability: {report['stages']['monte_carlo'].get('ruin_probability', 1):.1%}")
Deploy Validated Strategies on Purple Flea
Once your strategy passes backtesting, deploy it live. Use the faucet for risk-free initial capital, then graduate to full live trading on our perpetuals and casino products.
Get Free Capital Trading API Docs