Building a Backtesting Framework for Agent Strategies
Backtesting is how you separate strategies that look profitable from strategies that are profitable. Done wrong, it produces spectacular fiction. Done right, it gives you high-confidence estimates of live performance before risking a single satoshi. This guide builds a complete, production-quality backtesting framework from first principles โ with every correctness pitfall documented, and full integration with Purple Flea's live data.
Historical Data Sourcing
A backtest is only as good as its data. The most common data quality failures that invalidate backtests:
- Survivorship bias โ using only assets that still exist today inflates apparent returns. Cryptocurrencies that were delisted, hacked, or abandoned are absent from most databases.
- Point-in-time data โ fundamental data (e.g., order book state at T) must reflect what was actually known at T, not what was known later (revised data).
- Tick data vs OHLCV aggregation โ bar-level data loses intrabar price path information. Strategies that depend on stop-loss triggers need tick-level resolution.
- Exchange-specific data โ prices vary across exchanges. Always use data from the exchange where you will actually trade.
Purple Flea Historical Data API
import requests
import pandas as pd
from datetime import datetime
API_KEY = "pf_live_"
BASE_URL = "https://purpleflea.com/api/v1"
def fetch_ohlcv(
symbol: str,
interval: str,
start: datetime,
end: datetime,
include_volume_profile: bool = False
) -> pd.DataFrame:
"""
Fetch OHLCV data from Purple Flea.
interval: '1m' | '5m' | '15m' | '1h' | '4h' | '1d'
"""
resp = requests.get(
f"{BASE_URL}/market/history",
headers={"Authorization": f"Bearer {API_KEY}"},
params={
"symbol": symbol,
"interval": interval,
"start": int(start.timestamp()),
"end": int(end.timestamp()),
"volume_profile": include_volume_profile
}
)
data = resp.json()["candles"]
df = pd.DataFrame(data, columns=["timestamp", "open", "high", "low", "close", "volume"])
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")
df = df.set_index("timestamp").sort_index()
return df
def fetch_trade_data(symbol: str, start: datetime, end: datetime) -> pd.DataFrame:
"""Fetch raw trade data (individual ticks) for high-resolution backtests."""
resp = requests.get(
f"{BASE_URL}/market/trades",
headers={"Authorization": f"Bearer {API_KEY}"},
params={
"symbol": symbol,
"start": int(start.timestamp()),
"end": int(end.timestamp()),
}
)
trades = resp.json()["trades"]
df = pd.DataFrame(trades)
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")
return df
# Example: fetch 6 months of 1-hour BTC/USD data
start = datetime(2025, 9, 1)
end = datetime(2026, 3, 1)
# df = fetch_ohlcv("BTC-USD", "1h", start, end)
Data Cleaning Pipeline
def clean_ohlcv(df: pd.DataFrame) -> pd.DataFrame:
"""
Standard OHLCV data cleaning pipeline.
Removes corrupt bars and fills minor gaps.
"""
original_len = len(df)
# Remove bars where OHLCV constraints are violated
valid_mask = (
(df["high"] >= df["low"]) &
(df["high"] >= df["open"]) &
(df["high"] >= df["close"]) &
(df["low"] <= df["open"]) &
(df["low"] <= df["close"]) &
(df["volume"] >= 0) &
(df["close"] > 0)
)
df = df[valid_mask].copy()
# Flag and remove extreme outliers (>10 sigma moves are likely data errors)
returns = df["close"].pct_change()
sigma = returns.std()
df = df[returns.abs() < 10 * sigma].copy()
# Forward-fill up to 3 consecutive missing bars (exchange downtime)
df = df.resample(df.index.freq or "1H").last()
df = df.fillna(method="ffill", limit=3)
# Drop remaining NaN rows
df = df.dropna()
removed = original_len - len(df)
if removed > 0:
print(f"Removed {removed} invalid bars ({removed/original_len*100:.1f}%)")
return df
Event-Driven vs Vectorized Backtesting
There are two fundamental backtesting architectures, each with different accuracy vs speed tradeoffs:
Vectorized Backtesting
Apply trading rules as vectorized operations on the entire dataset simultaneously. Fast (milliseconds for years of data), simple to implement, but fundamentally incorrect for any strategy that uses feedback from previous trades, position sizing, or dynamic risk management.
import pandas as pd
import numpy as np
def vectorized_backtest_simple(df: pd.DataFrame, fast: int = 10, slow: int = 30) -> pd.DataFrame:
"""
Vectorized backtest: fast/slow moving average crossover.
WARNING: This is a simplified demo. Vectorized backtesting
cannot correctly model realistic order execution, slippage,
or position sizing. Use event-driven for production.
"""
df = df.copy()
df["fast_ma"] = df["close"].rolling(fast).mean()
df["slow_ma"] = df["close"].rolling(slow).mean()
# Signal: 1 = long, -1 = short, 0 = flat
df["signal"] = np.where(df["fast_ma"] > df["slow_ma"], 1, -1)
df["signal"] = df["signal"].shift(1) # avoid lookahead: act on NEXT bar
# Returns
df["returns"] = df["close"].pct_change()
df["strategy_returns"] = df["signal"] * df["returns"]
df["cumulative"] = (1 + df["strategy_returns"]).cumprod()
return df
Event-Driven Backtesting
Process the data chronologically as a stream of events โ exactly as a live trading system does. Each bar or tick triggers event handlers that can place orders, manage positions, and update state. Slower but far more accurate โ it correctly handles:
- Order fills that depend on the order book state at fill time
- Dynamic position sizing based on current capital
- Portfolio-level constraints (margin, correlation limits)
- Realistic order types (limit, stop, IOC) with proper fill logic
- Commission and slippage on a per-trade basis
For any strategy you will deploy with real capital, event-driven backtesting is mandatory. Vectorized backtests consistently overestimate returns by 15-40% due to unrealistic fill assumptions. The extra implementation time pays for itself on the first live trade.
Transaction Costs Modeling
The single biggest reason backtests fail to predict live performance is inadequate cost modeling. Complete cost structure:
| Cost Component | Typical Range | Scales With |
|---|---|---|
| Commission (taker) | 3-25 bps | Trade value |
| Commission (maker) | -2 to +5 bps | Trade value |
| Bid-ask spread | 1-50 bps | Asset liquidity |
| Market impact | 0-200 bps | Order size / ADV |
| Slippage (timing) | 1-10 bps | Market volatility |
| Funding rate (perps) | 0-50 bps/day | Open interest |
| Borrow cost (short) | 0-500 bps/yr | Short demand |
from dataclasses import dataclass
from typing import Optional
@dataclass
class TransactionCostModel:
"""
Complete transaction cost model for backtesting.
All rates in basis points (bps), where 100 bps = 1%.
"""
taker_commission_bps: float = 10.0 # 0.10%
maker_commission_bps: float = 3.0 # 0.03% (maker rebate on some exchanges)
spread_bps: float = 5.0 # half-spread paid
impact_eta: float = 0.1 # market impact coefficient
adv_usd: float = 2_000_000 # average daily volume
daily_vol: float = 0.025 # daily price volatility
funding_rate_daily_bps: float = 5.0 # for perpetual futures
def compute_cost(
self,
order_size_usd: float,
order_type: str = "taker",
holding_period_days: float = 1.0,
is_short: bool = False
) -> dict:
# Commission
if order_type == "taker":
commission = self.taker_commission_bps
else:
commission = self.maker_commission_bps
# Half spread (entering + exiting = full spread, but amortized per leg)
spread_cost = self.spread_bps / 2
# Market impact: square root law
import math
participation = order_size_usd / self.adv_usd
impact = self.daily_vol * math.sqrt(participation) * self.impact_eta * 10000 # bps
# Funding cost (for futures/perps on long positions)
funding = self.funding_rate_daily_bps * holding_period_days
# Borrow cost for short positions
borrow = 20.0 * holding_period_days / 365 if is_short else 0 # ~20% APR / 365
total_bps = commission + spread_cost + impact + funding + borrow
total_usd = order_size_usd * total_bps / 10000
return {
"commission_bps": commission,
"spread_bps": spread_cost,
"impact_bps": impact,
"funding_bps": funding,
"borrow_bps": borrow,
"total_bps": total_bps,
"total_usd": total_usd
}
cost_model = TransactionCostModel()
costs = cost_model.compute_cost(50_000, "taker", holding_period_days=1.0)
print(f"Total cost: {costs['total_bps']:.2f} bps = ${costs['total_usd']:.2f}")
# โ Total cost: 21.58 bps = $10.79
Lookahead Bias Prevention
Lookahead bias โ using future information in historical decisions โ is the most insidious backtest failure mode because it is invisible and always inflates performance. A strategy that uses tomorrow's closing price to make today's decision looks brilliant in backtest and is worthless live.
1. Using today's OHLCV to enter at today's open (open is known at bar start; close is not).
2. Applying .fillna() or normalization across the full dataset before splitting train/test.
3. Indicators like rolling Z-score using forward-looking window edges.
4. Using the same bar's high/low to trigger stop-loss and re-enter on the same bar.
import pandas as pd
import numpy as np
def safe_signal_generation(df: pd.DataFrame) -> pd.DataFrame:
"""
Demonstrates correct signal timing to prevent lookahead bias.
Rule: signals computed at bar CLOSE, executed at NEXT bar OPEN.
"""
df = df.copy()
# WRONG: signal uses same bar's close, would execute at that bar's open
# df["signal"] = (df["close"] > df["close"].shift(1)).astype(int)
# CORRECT: shift signal by 1 so it acts on the FOLLOWING bar
raw_signal = (df["close"] > df["close"].shift(1)).astype(int)
df["signal"] = raw_signal.shift(1) # execute at next bar open
# WRONG: normalize using full series (uses future data)
# df["normalized"] = (df["close"] - df["close"].mean()) / df["close"].std()
# CORRECT: use expanding window (only past data at each point)
df["normalized"] = (
(df["close"] - df["close"].expanding().mean()) /
df["close"].expanding().std()
)
# WRONG: RSI using full-window stats
# CORRECT: use fixed lookback, ensure index alignment
delta = df["close"].diff()
gain = delta.where(delta > 0, 0).rolling(14).mean()
loss = (-delta).where(delta < 0, 0).rolling(14).mean()
rs = gain / loss.replace(0, np.nan)
df["rsi"] = 100 - (100 / (1 + rs))
# RSI is computed from data up to and including bar T โ safe to use for T+1 signal
return df
def train_test_split_temporal(
df: pd.DataFrame,
train_frac: float = 0.7
) -> tuple:
"""
Temporal train/test split โ NEVER shuffle time series data.
All normalization and fitting must happen only on training data.
"""
split_idx = int(len(df) * train_frac)
train = df.iloc[:split_idx].copy()
test = df.iloc[split_idx:].copy()
# Compute any statistics (mean, std, etc.) ONLY on train
train_mean = train["close"].mean()
train_std = train["close"].std()
# Apply train statistics to both splits (never test statistics)
train["z_score"] = (train["close"] - train_mean) / train_std
test["z_score"] = (test["close"] - train_mean) / train_std # use train stats!
return train, test
Walk-Forward Optimization
Walk-forward optimization (WFO) is the gold standard for strategy parameter selection because it tests parameters on truly out-of-sample data at every step. It simulates exactly what a live trading system does: periodically re-optimize on recent history, then trade on unseen future data.
The procedure:
- Divide data into a sequence of windows: [train_1, test_1], [train_2, test_2], ...
- Each test window is immediately after the preceding train window (anchored or rolling)
- On each train window: grid search parameters, select the best
- Apply those parameters to the test window: record live-equivalent performance
- Concatenate all test-window results for final evaluation
import numpy as np
import pandas as pd
from itertools import product
from typing import Callable, Dict, List, Any
class WalkForwardOptimizer:
def __init__(
self,
strategy_fn: Callable,
param_grid: Dict[str, List[Any]],
n_train_periods: int = 252,
n_test_periods: int = 63,
step_size: int = 63, # re-optimize every quarter
objective: str = "sharpe"
):
self.strategy_fn = strategy_fn
self.param_grid = param_grid
self.n_train = n_train_periods
self.n_test = n_test_periods
self.step = step_size
self.objective = objective
def _score(self, returns: pd.Series) -> float:
if len(returns) < 5 or returns.std() == 0:
return -np.inf
if self.objective == "sharpe":
return returns.mean() / returns.std() * np.sqrt(252)
elif self.objective == "calmar":
ann_return = returns.mean() * 252
max_dd = self._max_drawdown(returns)
return ann_return / abs(max_dd) if max_dd != 0 else 0
elif self.objective == "total_return":
return (1 + returns).prod() - 1
def _max_drawdown(self, returns: pd.Series) -> float:
cum = (1 + returns).cumprod()
roll_max = cum.expanding().max()
drawdowns = cum / roll_max - 1
return drawdowns.min()
def run(self, df: pd.DataFrame) -> dict:
all_test_returns = []
optimization_log = []
param_combos = list(product(*self.param_grid.values()))
param_names = list(self.param_grid.keys())
start = self.n_train
while start + self.n_test <= len(df):
train_data = df.iloc[start - self.n_train : start]
test_data = df.iloc[start : start + self.n_test]
# Find best params on training data
best_score = -np.inf
best_params = None
for combo in param_combos:
params = dict(zip(param_names, combo))
try:
train_returns = self.strategy_fn(train_data, **params)
score = self._score(train_returns)
if score > best_score:
best_score = score
best_params = params
except Exception:
continue
if best_params is None:
start += self.step
continue
# Evaluate on OOS test window
test_returns = self.strategy_fn(test_data, **best_params)
all_test_returns.append(test_returns)
optimization_log.append({
"window_start": test_data.index[0],
"best_params": best_params,
"is_score": best_score,
"oos_score": self._score(test_returns)
})
start += self.step
combined = pd.concat(all_test_returns) if all_test_returns else pd.Series()
return {
"oos_returns": combined,
"optimization_log": optimization_log,
"oos_sharpe": self._score(combined) if len(combined) > 0 else None,
"oos_max_drawdown": self._max_drawdown(combined) if len(combined) > 0 else None
}
Overfitting Detection
Overfitting in backtesting occurs when parameters are chosen that maximize historical performance but are specific to noise rather than signal. The result: exceptional backtest, dismal live performance.
In-Sample / Out-of-Sample Split
The basic defense: never touch the test set until the strategy is finalized. A contaminated test set provides no information about live performance.
Ratio > 0.7: acceptable | 0.5-0.7: caution | < 0.5: likely overfit
Combinatorial Purged Cross-Validation (CPCV)
CPCV, from Lopez de Prado's "Advances in Financial Machine Learning," is the state-of-the-art method for testing financial strategies. It addresses the serial autocorrelation in financial data that makes standard k-fold cross-validation invalid.
import numpy as np
import pandas as pd
from scipy.stats import norm
def purged_k_fold(
df: pd.DataFrame,
n_splits: int = 5,
embargo_pct: float = 0.01
) -> list:
"""
Purged K-Fold cross-validation for time series.
Embargo: after each training fold, leave a gap equal to
embargo_pct of total samples to prevent leakage from
overlapping return windows.
"""
n = len(df)
fold_size = n // n_splits
embargo_size = int(n * embargo_pct)
folds = []
for k in range(n_splits):
test_start = k * fold_size
test_end = min((k + 1) * fold_size, n)
# Training data: everything except test + embargo buffer
train_indices = list(range(0, max(0, test_start - embargo_size)))
train_indices += list(range(min(n, test_end + embargo_size), n))
test_indices = list(range(test_start, test_end))
folds.append({
"fold": k,
"train": df.iloc[train_indices],
"test": df.iloc[test_indices]
})
return folds
def deflated_sharpe_ratio(
observed_sharpe: float,
n_trials: int,
n_observations: int,
skewness: float = 0.0,
kurtosis: float = 3.0
) -> float:
"""
Bailey & Lopez de Prado Deflated Sharpe Ratio.
Adjusts observed Sharpe for selection bias from trying multiple strategies.
Returns the probability that the strategy has a true positive Sharpe.
"""
# Expected maximum Sharpe from n_trials random strategies
euler_mascheroni = 0.5772156649
expected_max_sr = (
(1 - euler_mascheroni) * norm.ppf(1 - 1/n_trials) +
euler_mascheroni * norm.ppf(1 - 1/(n_trials * np.e))
)
# Variance of Sharpe estimator (accounting for non-normality)
sr_variance = (
(1 - skewness * observed_sharpe + (kurtosis - 1) / 4 * observed_sharpe**2)
/ (n_observations - 1)
)
# Deflated SR: probability of beating expected maximum by chance
deflated = (observed_sharpe - expected_max_sr) / np.sqrt(sr_variance)
psr = norm.cdf(deflated)
return psr
# Example: we tried 50 parameter combinations, observed Sharpe = 2.1
psr = deflated_sharpe_ratio(
observed_sharpe=2.1,
n_trials=50,
n_observations=500
)
print(f"Probability of genuine alpha: {psr:.1%}")
# Low PSR โ strategy is likely overfit from parameter search
Performance Metrics
No single metric captures all aspects of strategy quality. Use a dashboard of metrics that together paint a complete picture.
Complete Metrics Implementation
import numpy as np
import pandas as pd
from scipy import stats
def compute_performance_metrics(
returns: pd.Series,
benchmark_returns: pd.Series = None,
risk_free_rate: float = 0.05,
periods_per_year: int = 252
) -> dict:
"""
Comprehensive strategy performance metrics.
returns: daily returns series
risk_free_rate: annualized risk-free rate
"""
rf_daily = risk_free_rate / periods_per_year
excess = returns - rf_daily
# Annualized return
n = len(returns)
total_return = (1 + returns).prod() - 1
ann_return = (1 + total_return) ** (periods_per_year / n) - 1
# Volatility
ann_vol = returns.std() * np.sqrt(periods_per_year)
# Sharpe Ratio
sharpe = excess.mean() / returns.std() * np.sqrt(periods_per_year)
# Sortino Ratio (penalizes only downside volatility)
downside = returns[returns < rf_daily]
downside_vol = downside.std() * np.sqrt(periods_per_year) if len(downside) > 0 else np.nan
sortino = (ann_return - risk_free_rate) / downside_vol if downside_vol else np.nan
# Maximum Drawdown
cum = (1 + returns).cumprod()
roll_max = cum.expanding().max()
drawdowns = cum / roll_max - 1
max_dd = drawdowns.min()
# Calmar Ratio
calmar = ann_return / abs(max_dd) if max_dd != 0 else np.nan
# Average drawdown duration
in_drawdown = drawdowns < 0
dd_starts = in_drawdown & (~in_drawdown.shift(1).fillna(False))
dd_ends = (~in_drawdown) & (in_drawdown.shift(1).fillna(False))
dd_durations = []
start_idx = None
for i, (s, e) in enumerate(zip(dd_starts, dd_ends)):
if s: start_idx = i
if e and start_idx is not None:
dd_durations.append(i - start_idx)
avg_dd_duration = np.mean(dd_durations) if dd_durations else 0
# Win rate and profit factor
wins = returns[returns > 0]
losses = returns[returns < 0]
win_rate = len(wins) / len(returns) if len(returns) > 0 else 0
gross_profit = wins.sum()
gross_loss = abs(losses.sum())
profit_factor = gross_profit / gross_loss if gross_loss > 0 else np.inf
# Tail ratio (95th percentile return / 5th percentile loss)
tail_ratio = abs(np.percentile(returns, 95)) / abs(np.percentile(returns, 5))
# Beta and Alpha vs benchmark
beta, alpha, r_value, p_value, _ = (
stats.linregress(benchmark_returns, returns)
if benchmark_returns is not None and len(benchmark_returns) == len(returns)
else (np.nan, np.nan, np.nan, np.nan, np.nan)
)
# Omega Ratio
threshold = rf_daily
omega_num = returns[returns > threshold].sum() - threshold * len(returns[returns > threshold])
omega_den = abs(returns[returns <= threshold].sum() - threshold * len(returns[returns <= threshold]))
omega = omega_num / omega_den if omega_den > 0 else np.inf
return {
"total_return": f"{total_return:.2%}",
"ann_return": f"{ann_return:.2%}",
"ann_volatility": f"{ann_vol:.2%}",
"sharpe_ratio": round(sharpe, 3),
"sortino_ratio": round(sortino, 3),
"calmar_ratio": round(calmar, 3),
"omega_ratio": round(omega, 3),
"max_drawdown": f"{max_dd:.2%}",
"avg_dd_duration_days": round(avg_dd_duration, 1),
"win_rate": f"{win_rate:.2%}",
"profit_factor": round(profit_factor, 3),
"tail_ratio": round(tail_ratio, 3),
"beta": round(beta, 3) if not np.isnan(beta) else None,
"alpha_ann": f"{alpha * periods_per_year:.2%}" if not np.isnan(alpha) else None,
"n_periods": n,
}
# Example: evaluate a strategy
np.random.seed(42)
sample_returns = pd.Series(np.random.normal(0.0008, 0.015, 500)) # 500 daily returns
metrics = compute_performance_metrics(sample_returns)
for k, v in metrics.items():
print(f" {k:25s}: {v}")
Metric Interpretation Guide
| Metric | Poor | Acceptable | Good | Excellent |
|---|---|---|---|---|
| Sharpe Ratio | < 0.5 | 0.5 - 1.0 | 1.0 - 2.0 | > 2.0 |
| Sortino Ratio | < 0.7 | 0.7 - 1.5 | 1.5 - 3.0 | > 3.0 |
| Calmar Ratio | < 0.3 | 0.3 - 1.0 | 1.0 - 3.0 | > 3.0 |
| Max Drawdown | > 30% | 15-30% | 5-15% | < 5% |
| Win Rate | < 40% | 40-50% | 50-60% | > 60% |
| Profit Factor | < 1.0 | 1.0-1.3 | 1.3-2.0 | > 2.0 |
Backtesting on Purple Flea Casino/Trading Data
Purple Flea's unique angle: you can backtest agent strategies against real historical casino game outcomes, trading order books, and escrow settlement data. This allows you to simulate realistic agent P&L including all fee structures.
import requests
import pandas as pd
from dataclasses import dataclass, field
from typing import List, Optional
@dataclass
class CasinoBacktestResult:
hands_played: int = 0
total_wagered: float = 0.0
total_returned: float = 0.0
max_bankroll: float = 0.0
min_bankroll: float = float('inf')
bankroll_history: List[float] = field(default_factory=list)
def backtest_casino_strategy(
api_key: str,
strategy_fn,
initial_bankroll: float = 1.0,
game: str = "blackjack",
n_hands: int = 1000,
use_historical: bool = True
) -> CasinoBacktestResult:
"""
Backtest a casino strategy against Purple Flea historical hand data.
strategy_fn: function(hand_state) -> bet_size (fraction of bankroll)
"""
result = CasinoBacktestResult()
bankroll = initial_bankroll
result.bankroll_history.append(bankroll)
if use_historical:
# Fetch historical hand outcomes from Purple Flea
resp = requests.get(
"https://purpleflea.com/api/v1/casino/history",
headers={"Authorization": f"Bearer {api_key}"},
params={"game": game, "limit": n_hands}
)
hands = resp.json()["hands"]
else:
# Simulate with given RTP
import random
rtp = 0.995 # 99.5% return-to-player for blackjack
hands = [{"outcome": random.random() < rtp, "multiplier": 1.0} for _ in range(n_hands)]
for hand in hands:
if bankroll <= 0:
break
bet_fraction = strategy_fn({"bankroll": bankroll, "hand": hand})
bet_size = bankroll * max(0, min(1, bet_fraction)) # clamp to [0, bankroll]
if hand.get("outcome"):
multiplier = hand.get("multiplier", 1.0)
bankroll += bet_size * multiplier
else:
bankroll -= bet_size
result.hands_played += 1
result.total_wagered += bet_size
result.max_bankroll = max(result.max_bankroll, bankroll)
result.min_bankroll = min(result.min_bankroll, bankroll)
result.bankroll_history.append(bankroll)
result.total_returned = bankroll
return result
# Example: Kelly criterion strategy
def kelly_strategy(state: dict) -> float:
"""Kelly criterion bet sizing for a 49.5% win probability game."""
WIN_PROB = 0.495
LOSS_PROB = 1 - WIN_PROB
PAYOFF_RATIO = 1.0 # 1:1 payout
kelly_fraction = WIN_PROB - (LOSS_PROB / PAYOFF_RATIO)
half_kelly = kelly_fraction / 2 # half-Kelly for risk reduction
return max(0, half_kelly)
# result = backtest_casino_strategy(
# "pf_live_",
# kelly_strategy,
# initial_bankroll=1.0,
# game="blackjack",
# n_hands=5000
# )
Complete Backtesting Class
Putting it all together: a production-quality backtesting class that combines all the components above into a single unified interface.
class AgentBacktester:
"""
Complete event-driven backtesting framework for AI agent strategies.
Integrates data fetching, cost modeling, lookahead prevention,
walk-forward optimization, and performance reporting.
"""
def __init__(
self,
api_key: str,
cost_model: TransactionCostModel = None,
initial_capital: float = 100_000.0
):
self.api_key = api_key
self.cost_model = cost_model or TransactionCostModel()
self.capital = initial_capital
self.positions = {}
self.trades = []
self.equity_curve = []
def run(
self,
symbol: str,
strategy_fn: Callable,
start: datetime,
end: datetime,
interval: str = "1h",
walk_forward: bool = True
) -> dict:
# Fetch and clean data
df = fetch_ohlcv(symbol, interval, start, end)
df = clean_ohlcv(df)
if walk_forward:
# Use WFO to avoid parameter overfitting
wfo = WalkForwardOptimizer(
strategy_fn=strategy_fn,
param_grid={"fast": [5, 10, 20], "slow": [20, 40, 60]},
n_train_periods=500,
n_test_periods=125
)
wfo_result = wfo.run(df)
returns = wfo_result["oos_returns"]
else:
# Simple single-pass backtest (use only for research, not parameter selection)
train_df, test_df = train_test_split_temporal(df, 0.7)
returns = strategy_fn(test_df)
# Apply realistic transaction costs
n_trades = max(1, len(self.trades))
avg_trade_size = self.capital / n_trades
cost_per_trade = self.cost_model.compute_cost(avg_trade_size)
cost_per_period = cost_per_trade["total_bps"] / 10000 / 20 # amortized
adjusted_returns = returns - cost_per_period
# Compute metrics
metrics = compute_performance_metrics(adjusted_returns)
psr = deflated_sharpe_ratio(
observed_sharpe=float(metrics["sharpe_ratio"]),
n_trials=9, # 3x3 param grid = 9 combinations
n_observations=len(returns)
)
return {
"metrics": metrics,
"deflated_sharpe_psr": f"{psr:.1%}",
"returns_series": adjusted_returns,
"cost_model": cost_per_trade,
"recommendation": "DEPLOY" if psr > 0.85 and float(metrics["sharpe_ratio"]) > 1.5 else "MORE_TESTING"
}
# Usage
# backtester = AgentBacktester("pf_live_")
# result = backtester.run("BTC-USD", my_strategy, start, end, walk_forward=True)
# print(result["recommendation"])
Common Pitfalls Summary
A final checklist of the most common backtesting mistakes that cause live underperformance:
- No cost model โ even a simple 10 bps round-trip cost can eliminate most strategies. Model it.
- Signal-to-execution timing mismatch โ signals computed at bar close must execute at next bar open (or next bar close for day-end strategies).
- Data-snooping bias โ every parameter you manually tweak uses up degrees of freedom. Use deflated Sharpe ratio to measure true alpha.
- Ignoring capacity โ a strategy great at $10k may fail at $1M due to market impact. Test at target capital size.
- Single backtest period โ performance over one bull market means nothing. Test across multiple regimes.
- No regime analysis โ strategies that work in trending markets fail in choppy ones. Classify regimes and test separately.
- Confusing simulated and live fills โ limit orders in backtest always fill at limit price. Live orders may not fill at all.
Access Purple Flea's full historical dataset โ casino outcomes, trading OHLCV, order book snapshots โ via the API at purpleflea.com/api/v1/market/history. Register for a free API key at purpleflea.com/register. Keys use the pf_live_ prefix. Historical data goes back to platform launch with minute-level granularity.