Risk Management for AI Trading Agents: Don't Blow Up Your Wallet
1. Why Agents Blow Up
Most AI trading agents fail the same way. Not because of bad signal generation, not because of poor market timing โ they fail because they have no risk controls whatsoever. A capable agent given access to a trading account with no guardrails will, with near certainty, eventually find a sequence of losing trades that wipes the entire balance.
This isn't speculation. It's math. Without position sizing limits, a string of 6 consecutive losses at 20% per trade leaves you with 26 cents on the dollar. With volatility spikes, correlated positions, and latency issues compounding the problem, real-world blowups happen faster than simulations predict.
All-in positions on high-conviction signals that reverse. No stop-loss on trades left running overnight. Correlated positions treating independent bets as diversification. Chasing losses by doubling position size. No daily drawdown limit to halt trading after a bad run.
This guide covers every risk primitive you need to implement before deploying real capital: Kelly sizing, stop-losses, drawdown circuit breakers, concentration limits, volatility-adjusted sizing, risk budgets, and emergency shutdown. All with Python code using the Purple Flea Trading API.
2. The Kelly Criterion for AI Agents
The Kelly Criterion is the mathematically optimal formula for position sizing when you know your win rate and average win/loss ratio. It maximizes long-term capital growth while avoiding ruin. For AI agents with historical backtests, this is the right starting point.
f* = fraction of capital to bet | p = win probability | q = 1 - p | b = win/loss ratio
In practice, agents should use half-Kelly (f*/2) or quarter-Kelly to account for estimation error in win rates and the asymmetric pain of drawdowns vs gains. Full Kelly optimizes for expected log growth but tolerates 50%+ drawdowns that are psychologically (and operationally) catastrophic.
from dataclasses import dataclass
from typing import Optional
import statistics
@dataclass
class KellyConfig:
fraction: float = 0.25 # 0.25 = quarter-Kelly (recommended)
max_position_pct: float = 0.20 # hard cap: never exceed 20% per trade
min_trades_required: int = 30 # need 30+ trades before trusting estimates
class KellyPositionSizer:
def __init__(self, config: KellyConfig = KellyConfig()):
self.config = config
self.trade_history = [] # list of (profit_usdc, position_size_usdc)
def record_trade(self, profit: float, size: float) -> None:
self.trade_history.append({'profit': profit, 'size': size,
'return': profit / size if size > 0 else 0})
def compute_kelly(self) -> float:
"""Compute Kelly fraction from trade history."""
if len(self.trade_history) < self.config.min_trades_required:
# Insufficient data โ use conservative 2% flat sizing
return 0.02
wins = [t for t in self.trade_history if t['profit'] > 0]
losses = [t for t in self.trade_history if t['profit'] <= 0]
if not wins or not losses:
return 0.02 # edge case: all wins or all losses
p = len(wins) / len(self.trade_history)
q = 1 - p
avg_win = statistics.mean([t['return'] for t in wins])
avg_loss = abs(statistics.mean([t['return'] for t in losses]))
b = avg_win / avg_loss # win/loss ratio
f_full = (p * b - q) / b # full Kelly
f_applied = max(0, f_full * self.config.fraction) # fractional Kelly
return f_applied
def get_position_size(self, capital_usdc: float, signal_strength: float = 1.0) -> float:
"""Get recommended position size in USDC."""
kelly_f = self.compute_kelly()
# Scale by signal strength (0.0 to 1.0)
adjusted = kelly_f * signal_strength
# Apply hard cap
capped = min(adjusted, self.config.max_position_pct)
position_usdc = capital_usdc * capped
print(f" Kelly: {kelly_f:.3f} | Adjusted: {adjusted:.3f} | Capped: {capped:.3f}")
print(f" Position size: ${position_usdc:.2f} ({capped*100:.1f}% of ${capital_usdc:.0f})")
return position_usdc
# Example usage
sizer = KellyPositionSizer()
# Simulate 40 historical trades: 60% win rate, 1.5:1 win/loss ratio
import random
random.seed(42)
for _ in range(40):
won = random.random() < 0.60
profit = random.uniform(0.10, 0.20) if won else -random.uniform(0.05, 0.15)
sizer.record_trade(profit=profit, size=1.0)
size = sizer.get_position_size(capital_usdc=100.0)
# Output: Position size: $5.23 (5.2% of $100)
3. Stop-Loss Implementation
A stop-loss closes a position automatically when the unrealized loss exceeds a threshold. The trailing stop-loss variant follows the price upward and only triggers on reversal โ locking in profits while still cutting losers.
import asyncio
import requests
from dataclasses import dataclass, field
from typing import Dict, Optional
TRADING_API = "https://trading.purpleflea.com/api"
API_KEY = "pf_live_trading_m4r9k"
@dataclass
class Position:
position_id: str
pair: str
entry_price: float
size_usdc: float
stop_pct: float = 0.05 # initial stop: 5% below entry
trailing_pct: float = 0.03 # trail: 3% below high water mark
high_water_mark: float = field(init=False)
stop_price: float = field(init=False)
def __post_init__(self):
self.high_water_mark = self.entry_price
self.stop_price = self.entry_price * (1 - self.stop_pct)
class StopLossManager:
def __init__(self, api_key: str):
self.api_key = api_key
self.positions: Dict[str, Position] = {}
self.headers = {"Authorization": f"Bearer {api_key}"}
def add_position(self, pos: Position) -> None:
self.positions[pos.position_id] = pos
print(f"[SL] Watching {pos.pair} | stop: ${pos.stop_price:.4f}")
async def monitor_loop(self, interval_seconds: int = 10) -> None:
"""Continuously monitor positions and trigger stops."""
while self.positions:
for pos_id, pos in list(self.positions.items()):
current = self.get_current_price(pos.pair)
self.update_trailing_stop(pos, current)
if current <= pos.stop_price:
print(f"[SL] STOP TRIGGERED: {pos.pair} @ {current:.4f} (stop: {pos.stop_price:.4f})")
await self.close_position(pos)
del self.positions[pos_id]
else:
unrealized_pnl = (current - pos.entry_price) / pos.entry_price * 100
print(f"[SL] {pos.pair}: ${current:.4f} | PnL: {unrealized_pnl:+.2f}% | stop: ${pos.stop_price:.4f}")
await asyncio.sleep(interval_seconds)
def update_trailing_stop(self, pos: Position, current_price: float) -> None:
"""Raise the stop-loss as price climbs (trailing stop)."""
if current_price > pos.high_water_mark:
pos.high_water_mark = current_price
new_stop = current_price * (1 - pos.trailing_pct)
if new_stop > pos.stop_price:
pos.stop_price = new_stop
print(f"[SL] Trailing stop raised: {pos.pair} new stop = ${new_stop:.4f}")
async def close_position(self, pos: Position) -> dict:
r = requests.post(
f"{TRADING_API}/positions/{pos.position_id}/close",
headers=self.headers,
json={"reason": "stop_loss_triggered", "urgency": "immediate"}
)
result = r.json()
print(f"[SL] Position closed: {result.get('status')} | realized_pnl: {result.get('pnl_usdc')}")
return result
def get_current_price(self, pair: str) -> float:
# Real implementation: call price feed API
r = requests.get(f"{TRADING_API}/prices/{pair}", headers=self.headers)
return r.json()['price']
# Usage
slm = StopLossManager(api_key=API_KEY)
slm.add_position(Position(
position_id="pos_eth_001",
pair="ETH/USDC",
entry_price=3240.00,
size_usdc=50.00,
stop_pct=0.05, # initial stop: $3078
trailing_pct=0.03 # trail: 3% below high
))
asyncio.run(slm.monitor_loop())
4. Drawdown Circuit Breaker
A daily drawdown circuit breaker halts all trading when losses exceed a threshold for the day. The standard setting: halt at -10% daily drawdown. Once halted, the agent stops opening new positions until the next trading session.
This single control prevents the most common catastrophic failure mode: an agent in a losing streak that keeps trading, compounding losses with each new position.
import time
from datetime import datetime, date
class DrawdownCircuitBreaker:
def __init__(self, max_daily_drawdown_pct: float = 0.10):
self.max_drawdown_pct = max_daily_drawdown_pct
self.session_start_capital = None
self.session_date = None
self.halted = False
self.halt_reason = None
def start_session(self, capital_usdc: float) -> None:
if self.session_date != date.today():
# New day โ reset circuit breaker
self.session_start_capital = capital_usdc
self.session_date = date.today()
self.halted = False
self.halt_reason = None
print(f"[CB] New session started | capital: ${capital_usdc:.2f}")
def check_and_halt(self, current_capital: float) -> bool:
"""Returns True if trading should halt."""
if self.halted:
print(f"[CB] HALTED: {self.halt_reason}")
return True
if self.session_start_capital is None:
raise RuntimeError("Call start_session() before checking drawdown")
drawdown = (self.session_start_capital - current_capital) / self.session_start_capital
drawdown_pct = drawdown * 100
if drawdown >= self.max_drawdown_pct:
self.halted = True
self.halt_reason = (
f"Daily drawdown limit hit: {drawdown_pct:.1f}% "
f"(max: {self.max_drawdown_pct*100:.0f}%) | "
f"Loss: ${self.session_start_capital - current_capital:.2f}"
)
print(f"\n[CB] *** CIRCUIT BREAKER TRIGGERED ***")
print(f"[CB] {self.halt_reason}")
print(f"[CB] All trading halted until next session.\n")
return True
# Log current status
status = "SAFE" if drawdown < 0.05 else "WARNING"
print(f"[CB] {status} | Drawdown: {drawdown_pct:.2f}% | Capital: ${current_capital:.2f}")
return False
def can_trade(self, current_capital: float) -> bool:
return not self.check_and_halt(current_capital)
# Integrate into trading loop
cb = DrawdownCircuitBreaker(max_daily_drawdown_pct=0.10) # halt at -10%
cb.start_session(capital_usdc=500.00)
# Simulated trading loop
capital = 500.00
for trade_num in range(20):
if not cb.can_trade(capital):
print("Trading loop exiting โ circuit breaker active")
break
# ... execute trade logic ...
capital -= 8.00 # simulate losing trades
5. Portfolio Concentration Limits
No single position should exceed 20% of portfolio value. This is not a suggestion โ it is a hard constraint your agent must enforce before every trade execution. Concentration risk is the mechanism by which a single bad trade wipes a large fraction of capital.
class ConcentrationGuard:
def __init__(self, max_single_pct: float = 0.20, max_sector_pct: float = 0.40):
self.max_single = max_single_pct # max 20% in one position
self.max_sector = max_sector_pct # max 40% in one sector (e.g. all L1s)
self.positions = {} # {pair: size_usdc}
self.sectors = {} # {sector: [pair1, pair2, ...]}
def can_open(self, pair: str, size_usdc: float,
total_capital: float, sector: str = "default") -> tuple[bool, str]:
"""Returns (allowed, reason) for a proposed position."""
# Check single position limit
existing = self.positions.get(pair, 0)
total_in_pair = existing + size_usdc
pct_of_portfolio = total_in_pair / total_capital
if pct_of_portfolio > self.max_single:
return False, (
f"Position concentration too high: {pct_of_portfolio*100:.1f}% "
f"exceeds {self.max_single*100:.0f}% limit"
)
# Check sector concentration
sector_pairs = self.sectors.get(sector, [])
sector_total = sum(self.positions.get(p, 0) for p in sector_pairs) + size_usdc
sector_pct = sector_total / total_capital
if sector_pct > self.max_sector:
return False, (
f"Sector '{sector}' concentration: {sector_pct*100:.1f}% "
f"exceeds {self.max_sector*100:.0f}% limit"
)
return True, "ok"
def record_open(self, pair: str, size_usdc: float, sector: str = "default") -> None:
self.positions[pair] = self.positions.get(pair, 0) + size_usdc
if sector not in self.sectors: self.sectors[sector] = []
if pair not in self.sectors[sector]: self.sectors[sector].append(pair)
def record_close(self, pair: str, size_usdc: float) -> None:
self.positions[pair] = max(0, self.positions.get(pair, 0) - size_usdc)
# Usage in trading agent
guard = ConcentrationGuard(max_single_pct=0.20, max_sector_pct=0.40)
allowed, reason = guard.can_open("ETH/USDC", 22.00, total_capital=100.00, sector="L1")
print(f"Trade allowed: {allowed} | {reason}")
# Trade allowed: False | Position concentration too high: 22.0% exceeds 20% limit
6. Volatility-Adjusted Position Sizing
Kelly gives you a sizing fraction, but it doesn't account for current market volatility. A position that makes sense in a low-volatility regime is reckless in a high-volatility one. ATR (Average True Range) normalization scales position size inversely with volatility.
import statistics
def compute_atr(highs: list, lows: list, closes: list, period: int = 14) -> float:
"""Compute Average True Range over 'period' candles."""
true_ranges = []
for i in range(1, len(closes)):
tr = max(
highs[i] - lows[i],
abs(highs[i] - closes[i-1]),
abs(lows[i] - closes[i-1])
)
true_ranges.append(tr)
return statistics.mean(true_ranges[-period:])
def volatility_adjusted_size(
base_size_usdc: float,
current_atr: float,
baseline_atr: float,
max_scale: float = 2.0,
min_scale: float = 0.25
) -> float:
"""Scale position size inversely with volatility."""
if current_atr <= 0: return base_size_usdc
vol_ratio = baseline_atr / current_atr # >1 = lower vol than baseline
scale = max(min_scale, min(max_scale, vol_ratio))
adjusted = base_size_usdc * scale
print(f" ATR ratio: {vol_ratio:.2f} | scale: {scale:.2f}x | size: ${adjusted:.2f}")
return adjusted
# Example: baseline ATR is $80, current ATR spiked to $160 (2x volatility)
# Position should shrink by half
size = volatility_adjusted_size(
base_size_usdc=50.00,
current_atr=160.00, # high vol day
baseline_atr=80.00 # normal vol baseline
)
# ATR ratio: 0.50 | scale: 0.50x | size: $25.00
7. Risk Budget Framework
A risk budget defines how much capital can be "at risk" simultaneously across all open positions. Total open risk must never exceed the daily budget. Combined with Kelly sizing and ATR adjustment, this creates a three-layer defense against blowup.
class RiskBudget:
def __init__(self, total_capital: float, daily_risk_pct: float = 0.05):
self.total_capital = total_capital
self.daily_budget_usdc = total_capital * daily_risk_pct # e.g. 5% = $50 on $1000
self.consumed_usdc = 0.0
self.open_positions = {} # {pos_id: risk_usdc}
def available_risk(self) -> float:
return max(0, self.daily_budget_usdc - self.consumed_usdc)
def allocate(self, pos_id: str, size_usdc: float, stop_pct: float) -> bool:
"""Request risk allocation for a new position."""
risk_usdc = size_usdc * stop_pct # max loss if stop triggered
available = self.available_risk()
if risk_usdc > available:
print(f"[RB] Denied: risk ${risk_usdc:.2f} > available ${available:.2f}")
return False
self.open_positions[pos_id] = risk_usdc
self.consumed_usdc += risk_usdc
utilization = self.consumed_usdc / self.daily_budget_usdc * 100
print(f"[RB] Allocated: ${risk_usdc:.2f} | Budget {utilization:.0f}% used")
return True
def release(self, pos_id: str) -> None:
if pos_id in self.open_positions:
released = self.open_positions.pop(pos_id)
self.consumed_usdc -= released
print(f"[RB] Released: ${released:.2f} back to budget")
def status(self) -> dict:
return {
'budget': self.daily_budget_usdc,
'consumed': self.consumed_usdc,
'available': self.available_risk(),
'utilization_pct': self.consumed_usdc / self.daily_budget_usdc * 100
}
# $1000 capital, 5% daily risk budget = $50 max at-risk per day
rb = RiskBudget(total_capital=1000.0, daily_risk_pct=0.05)
# Try to open 3 positions
rb.allocate("pos1", size_usdc=100, stop_pct=0.05) # $5 risk โ OK
rb.allocate("pos2", size_usdc=200, stop_pct=0.05) # $10 risk โ OK
rb.allocate("pos3", size_usdc=700, stop_pct=0.05) # $35 risk โ denied (over budget)
print(rb.status())
8. Emergency Shutdown Procedure
When all else fails, you need a way to close every open position immediately. This is your last line of defense โ triggered by external alert, human intervention, or an automated system detecting unusual behavior.
Emergency shutdown must use market orders, not limit orders. Speed takes priority over slippage. A 0.5% worse fill is acceptable. Being stuck in a position during a flash crash is not.
import asyncio
import requests
async def emergency_shutdown(api_key: str, reason: str = "manual_trigger") -> dict:
"""Close ALL open positions immediately using market orders."""
print(f"\n[EMERGENCY] *** SHUTDOWN INITIATED ***")
print(f"[EMERGENCY] Reason: {reason}")
headers = {"Authorization": f"Bearer {api_key}"}
# Step 1: Get all open positions
r = requests.get(f"{TRADING_API}/positions?status=open", headers=headers)
positions = r.json().get('positions', [])
print(f"[EMERGENCY] Found {len(positions)} open positions to close")
# Step 2: Cancel all pending orders first (avoid partial fills)
requests.delete(f"{TRADING_API}/orders/open", headers=headers)
print(f"[EMERGENCY] Cancelled all pending orders")
# Step 3: Close all positions in parallel (market orders)
close_tasks = [
close_position_market(pos['position_id'], api_key)
for pos in positions
]
results = await asyncio.gather(*close_tasks, return_exceptions=True)
success_count = sum(1 for r in results if not isinstance(r, Exception))
print(f"[EMERGENCY] Closed {success_count}/{len(positions)} positions")
print(f"[EMERGENCY] Shutdown complete.\n")
return {'positions_closed': success_count, 'total': len(positions), 'reason': reason}
async def close_position_market(pos_id: str, api_key: str) -> dict:
r = requests.post(
f"{TRADING_API}/positions/{pos_id}/close",
headers={"Authorization": f"Bearer {api_key}"},
json={"order_type": "market", "urgency": "immediate"}
)
return r.json()
# Trigger manually or via webhook
asyncio.run(emergency_shutdown(api_key=API_KEY, reason="drawdown_limit_exceeded"))
9. Monitoring and Alerts
Proactive alerts keep you (or your orchestrator agent) informed before limits are breached. Send webhook notifications when drawdown approaches 5%, 7%, and 10% โ not just at the final halt.
import requests
from dataclasses import dataclass
from typing import List
ALERT_WEBHOOK = "https://your-alert-endpoint.com/webhook"
@dataclass
class AlertThreshold:
drawdown_pct: float
severity: str # "info" | "warning" | "critical"
triggered: bool = False
class RiskMonitor:
DEFAULT_THRESHOLDS = [
AlertThreshold(0.03, "info"), # -3%: heads up
AlertThreshold(0.05, "warning"), # -5%: tighten sizes
AlertThreshold(0.07, "warning"), # -7%: consider pausing
AlertThreshold(0.10, "critical"), # -10%: halt trading
]
def __init__(self, session_capital: float, agent_id: str):
self.session_capital = session_capital
self.agent_id = agent_id
self.thresholds = [AlertThreshold(t.drawdown_pct, t.severity)
for t in self.DEFAULT_THRESHOLDS]
def check(self, current_capital: float) -> List[AlertThreshold]:
drawdown = (self.session_capital - current_capital) / self.session_capital
triggered_now = []
for threshold in self.thresholds:
if drawdown >= threshold.drawdown_pct and not threshold.triggered:
threshold.triggered = True
triggered_now.append(threshold)
self.send_alert(threshold, drawdown, current_capital)
return triggered_now
def send_alert(self, t: AlertThreshold, drawdown: float, capital: float) -> None:
payload = {
"agent_id": self.agent_id,
"severity": t.severity,
"drawdown_pct": round(drawdown * 100, 2),
"current_capital_usdc": round(capital, 2),
"session_start_usdc": self.session_capital,
"message": f"Drawdown alert: {drawdown*100:.1f}% | Action required: {t.severity}"
}
try:
requests.post(ALERT_WEBHOOK, json=payload, timeout=5)
print(f"[ALERT] {t.severity.upper()}: {drawdown*100:.1f}% drawdown โ webhook sent")
except Exception as e:
print(f"[ALERT] Failed to send webhook: {e}")
# Usage in main trading loop
monitor = RiskMonitor(session_capital=500.0, agent_id="trading-agent-001")
monitor.check(current_capital=460.0) # 8% drawdown โ warning alert fires
10. Conclusion
Risk management is not optional for autonomous trading agents. It is the difference between a system that compounds capital over time and one that burns through it in a single bad session. The controls in this guide, applied together, provide defense in depth:
- Kelly sizing โ never bet more than the math supports
- Trailing stop-loss โ exit automatically on reversal
- Drawdown circuit breaker โ halt at -10% daily loss
- Concentration limits โ cap at 20% per position
- ATR-adjusted sizing โ trade smaller in volatile regimes
- Risk budget โ cap total at-risk across all open positions
- Emergency shutdown โ close everything instantly when needed
- Webhook alerts โ know about problems before limits are breached
Test all these risk controls against the Purple Flea Trading API with free funds from the faucet. Build and validate your full risk stack before deploying real capital.
Ready to Build a Risk-Managed Trading Agent?
Get free USDC from the faucet, set up your agent on Purple Flea, and test your risk controls in a live environment.
Claim Free $1 USDC Trading API Docs Trading Bot GuideRelated: Building an Autonomous Trading Bot in 2026 ยท Agent Bankroll Management ยท Multi-Agent Coordination