Risk Management

Risk Management for AI Trading Agents: Don't Blow Up Your Wallet

1. Why Agents Blow Up

Most AI trading agents fail the same way. Not because of bad signal generation, not because of poor market timing โ€” they fail because they have no risk controls whatsoever. A capable agent given access to a trading account with no guardrails will, with near certainty, eventually find a sequence of losing trades that wipes the entire balance.

This isn't speculation. It's math. Without position sizing limits, a string of 6 consecutive losses at 20% per trade leaves you with 26 cents on the dollar. With volatility spikes, correlated positions, and latency issues compounding the problem, real-world blowups happen faster than simulations predict.

The most common agent blowups

All-in positions on high-conviction signals that reverse. No stop-loss on trades left running overnight. Correlated positions treating independent bets as diversification. Chasing losses by doubling position size. No daily drawdown limit to halt trading after a bad run.

This guide covers every risk primitive you need to implement before deploying real capital: Kelly sizing, stop-losses, drawdown circuit breakers, concentration limits, volatility-adjusted sizing, risk budgets, and emergency shutdown. All with Python code using the Purple Flea Trading API.

2. The Kelly Criterion for AI Agents

The Kelly Criterion is the mathematically optimal formula for position sizing when you know your win rate and average win/loss ratio. It maximizes long-term capital growth while avoiding ruin. For AI agents with historical backtests, this is the right starting point.

Kelly Criterion
f* = (p * b - q) / b

f* = fraction of capital to bet | p = win probability | q = 1 - p | b = win/loss ratio

In practice, agents should use half-Kelly (f*/2) or quarter-Kelly to account for estimation error in win rates and the asymmetric pain of drawdowns vs gains. Full Kelly optimizes for expected log growth but tolerates 50%+ drawdowns that are psychologically (and operationally) catastrophic.

kelly.py โ€” position sizing with Kelly Criterion
from dataclasses import dataclass
from typing import Optional
import statistics

@dataclass
class KellyConfig:
    fraction: float = 0.25      # 0.25 = quarter-Kelly (recommended)
    max_position_pct: float = 0.20  # hard cap: never exceed 20% per trade
    min_trades_required: int = 30   # need 30+ trades before trusting estimates

class KellyPositionSizer:
    def __init__(self, config: KellyConfig = KellyConfig()):
        self.config = config
        self.trade_history = []  # list of (profit_usdc, position_size_usdc)

    def record_trade(self, profit: float, size: float) -> None:
        self.trade_history.append({'profit': profit, 'size': size,
                                    'return': profit / size if size > 0 else 0})

    def compute_kelly(self) -> float:
        """Compute Kelly fraction from trade history."""
        if len(self.trade_history) < self.config.min_trades_required:
            # Insufficient data โ€” use conservative 2% flat sizing
            return 0.02

        wins = [t for t in self.trade_history if t['profit'] > 0]
        losses = [t for t in self.trade_history if t['profit'] <= 0]

        if not wins or not losses:
            return 0.02  # edge case: all wins or all losses

        p = len(wins) / len(self.trade_history)
        q = 1 - p

        avg_win = statistics.mean([t['return'] for t in wins])
        avg_loss = abs(statistics.mean([t['return'] for t in losses]))

        b = avg_win / avg_loss  # win/loss ratio
        f_full = (p * b - q) / b  # full Kelly
        f_applied = max(0, f_full * self.config.fraction)  # fractional Kelly

        return f_applied

    def get_position_size(self, capital_usdc: float, signal_strength: float = 1.0) -> float:
        """Get recommended position size in USDC."""
        kelly_f = self.compute_kelly()

        # Scale by signal strength (0.0 to 1.0)
        adjusted = kelly_f * signal_strength

        # Apply hard cap
        capped = min(adjusted, self.config.max_position_pct)

        position_usdc = capital_usdc * capped
        print(f"  Kelly: {kelly_f:.3f} | Adjusted: {adjusted:.3f} | Capped: {capped:.3f}")
        print(f"  Position size: ${position_usdc:.2f} ({capped*100:.1f}% of ${capital_usdc:.0f})")
        return position_usdc

# Example usage
sizer = KellyPositionSizer()

# Simulate 40 historical trades: 60% win rate, 1.5:1 win/loss ratio
import random
random.seed(42)
for _ in range(40):
    won = random.random() < 0.60
    profit = random.uniform(0.10, 0.20) if won else -random.uniform(0.05, 0.15)
    sizer.record_trade(profit=profit, size=1.0)

size = sizer.get_position_size(capital_usdc=100.0)
# Output: Position size: $5.23 (5.2% of $100)

3. Stop-Loss Implementation

A stop-loss closes a position automatically when the unrealized loss exceeds a threshold. The trailing stop-loss variant follows the price upward and only triggers on reversal โ€” locking in profits while still cutting losers.

stop_loss.py โ€” trailing stop-loss via Purple Flea Trading API
import asyncio
import requests
from dataclasses import dataclass, field
from typing import Dict, Optional

TRADING_API = "https://trading.purpleflea.com/api"
API_KEY = "pf_live_trading_m4r9k"

@dataclass
class Position:
    position_id: str
    pair: str
    entry_price: float
    size_usdc: float
    stop_pct: float = 0.05         # initial stop: 5% below entry
    trailing_pct: float = 0.03      # trail: 3% below high water mark
    high_water_mark: float = field(init=False)
    stop_price: float = field(init=False)

    def __post_init__(self):
        self.high_water_mark = self.entry_price
        self.stop_price = self.entry_price * (1 - self.stop_pct)

class StopLossManager:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.positions: Dict[str, Position] = {}
        self.headers = {"Authorization": f"Bearer {api_key}"}

    def add_position(self, pos: Position) -> None:
        self.positions[pos.position_id] = pos
        print(f"[SL] Watching {pos.pair} | stop: ${pos.stop_price:.4f}")

    async def monitor_loop(self, interval_seconds: int = 10) -> None:
        """Continuously monitor positions and trigger stops."""
        while self.positions:
            for pos_id, pos in list(self.positions.items()):
                current = self.get_current_price(pos.pair)
                self.update_trailing_stop(pos, current)

                if current <= pos.stop_price:
                    print(f"[SL] STOP TRIGGERED: {pos.pair} @ {current:.4f} (stop: {pos.stop_price:.4f})")
                    await self.close_position(pos)
                    del self.positions[pos_id]
                else:
                    unrealized_pnl = (current - pos.entry_price) / pos.entry_price * 100
                    print(f"[SL] {pos.pair}: ${current:.4f} | PnL: {unrealized_pnl:+.2f}% | stop: ${pos.stop_price:.4f}")

            await asyncio.sleep(interval_seconds)

    def update_trailing_stop(self, pos: Position, current_price: float) -> None:
        """Raise the stop-loss as price climbs (trailing stop)."""
        if current_price > pos.high_water_mark:
            pos.high_water_mark = current_price
            new_stop = current_price * (1 - pos.trailing_pct)
            if new_stop > pos.stop_price:
                pos.stop_price = new_stop
                print(f"[SL] Trailing stop raised: {pos.pair} new stop = ${new_stop:.4f}")

    async def close_position(self, pos: Position) -> dict:
        r = requests.post(
            f"{TRADING_API}/positions/{pos.position_id}/close",
            headers=self.headers,
            json={"reason": "stop_loss_triggered", "urgency": "immediate"}
        )
        result = r.json()
        print(f"[SL] Position closed: {result.get('status')} | realized_pnl: {result.get('pnl_usdc')}")
        return result

    def get_current_price(self, pair: str) -> float:
        # Real implementation: call price feed API
        r = requests.get(f"{TRADING_API}/prices/{pair}", headers=self.headers)
        return r.json()['price']

# Usage
slm = StopLossManager(api_key=API_KEY)
slm.add_position(Position(
    position_id="pos_eth_001",
    pair="ETH/USDC",
    entry_price=3240.00,
    size_usdc=50.00,
    stop_pct=0.05,     # initial stop: $3078
    trailing_pct=0.03  # trail: 3% below high
))
asyncio.run(slm.monitor_loop())

4. Drawdown Circuit Breaker

A daily drawdown circuit breaker halts all trading when losses exceed a threshold for the day. The standard setting: halt at -10% daily drawdown. Once halted, the agent stops opening new positions until the next trading session.

This single control prevents the most common catastrophic failure mode: an agent in a losing streak that keeps trading, compounding losses with each new position.

circuit_breaker.py โ€” daily drawdown halt
import time
from datetime import datetime, date

class DrawdownCircuitBreaker:
    def __init__(self, max_daily_drawdown_pct: float = 0.10):
        self.max_drawdown_pct = max_daily_drawdown_pct
        self.session_start_capital = None
        self.session_date = None
        self.halted = False
        self.halt_reason = None

    def start_session(self, capital_usdc: float) -> None:
        if self.session_date != date.today():
            # New day โ€” reset circuit breaker
            self.session_start_capital = capital_usdc
            self.session_date = date.today()
            self.halted = False
            self.halt_reason = None
            print(f"[CB] New session started | capital: ${capital_usdc:.2f}")

    def check_and_halt(self, current_capital: float) -> bool:
        """Returns True if trading should halt."""
        if self.halted:
            print(f"[CB] HALTED: {self.halt_reason}")
            return True

        if self.session_start_capital is None:
            raise RuntimeError("Call start_session() before checking drawdown")

        drawdown = (self.session_start_capital - current_capital) / self.session_start_capital
        drawdown_pct = drawdown * 100

        if drawdown >= self.max_drawdown_pct:
            self.halted = True
            self.halt_reason = (
                f"Daily drawdown limit hit: {drawdown_pct:.1f}% "
                f"(max: {self.max_drawdown_pct*100:.0f}%) | "
                f"Loss: ${self.session_start_capital - current_capital:.2f}"
            )
            print(f"\n[CB] *** CIRCUIT BREAKER TRIGGERED ***")
            print(f"[CB] {self.halt_reason}")
            print(f"[CB] All trading halted until next session.\n")
            return True

        # Log current status
        status = "SAFE" if drawdown < 0.05 else "WARNING"
        print(f"[CB] {status} | Drawdown: {drawdown_pct:.2f}% | Capital: ${current_capital:.2f}")
        return False

    def can_trade(self, current_capital: float) -> bool:
        return not self.check_and_halt(current_capital)

# Integrate into trading loop
cb = DrawdownCircuitBreaker(max_daily_drawdown_pct=0.10)  # halt at -10%
cb.start_session(capital_usdc=500.00)

# Simulated trading loop
capital = 500.00
for trade_num in range(20):
    if not cb.can_trade(capital):
        print("Trading loop exiting โ€” circuit breaker active")
        break
    # ... execute trade logic ...
    capital -= 8.00  # simulate losing trades

5. Portfolio Concentration Limits

No single position should exceed 20% of portfolio value. This is not a suggestion โ€” it is a hard constraint your agent must enforce before every trade execution. Concentration risk is the mechanism by which a single bad trade wipes a large fraction of capital.

concentration.py โ€” enforce max position concentration
class ConcentrationGuard:
    def __init__(self, max_single_pct: float = 0.20, max_sector_pct: float = 0.40):
        self.max_single = max_single_pct   # max 20% in one position
        self.max_sector = max_sector_pct   # max 40% in one sector (e.g. all L1s)
        self.positions = {}  # {pair: size_usdc}
        self.sectors = {}    # {sector: [pair1, pair2, ...]}

    def can_open(self, pair: str, size_usdc: float,
                  total_capital: float, sector: str = "default") -> tuple[bool, str]:
        """Returns (allowed, reason) for a proposed position."""
        # Check single position limit
        existing = self.positions.get(pair, 0)
        total_in_pair = existing + size_usdc
        pct_of_portfolio = total_in_pair / total_capital

        if pct_of_portfolio > self.max_single:
            return False, (
                f"Position concentration too high: {pct_of_portfolio*100:.1f}% "
                f"exceeds {self.max_single*100:.0f}% limit"
            )

        # Check sector concentration
        sector_pairs = self.sectors.get(sector, [])
        sector_total = sum(self.positions.get(p, 0) for p in sector_pairs) + size_usdc
        sector_pct = sector_total / total_capital

        if sector_pct > self.max_sector:
            return False, (
                f"Sector '{sector}' concentration: {sector_pct*100:.1f}% "
                f"exceeds {self.max_sector*100:.0f}% limit"
            )

        return True, "ok"

    def record_open(self, pair: str, size_usdc: float, sector: str = "default") -> None:
        self.positions[pair] = self.positions.get(pair, 0) + size_usdc
        if sector not in self.sectors: self.sectors[sector] = []
        if pair not in self.sectors[sector]: self.sectors[sector].append(pair)

    def record_close(self, pair: str, size_usdc: float) -> None:
        self.positions[pair] = max(0, self.positions.get(pair, 0) - size_usdc)

# Usage in trading agent
guard = ConcentrationGuard(max_single_pct=0.20, max_sector_pct=0.40)

allowed, reason = guard.can_open("ETH/USDC", 22.00, total_capital=100.00, sector="L1")
print(f"Trade allowed: {allowed} | {reason}")
# Trade allowed: False | Position concentration too high: 22.0% exceeds 20% limit

6. Volatility-Adjusted Position Sizing

Kelly gives you a sizing fraction, but it doesn't account for current market volatility. A position that makes sense in a low-volatility regime is reckless in a high-volatility one. ATR (Average True Range) normalization scales position size inversely with volatility.

atr_sizing.py โ€” ATR-based volatility-adjusted sizing
import statistics

def compute_atr(highs: list, lows: list, closes: list, period: int = 14) -> float:
    """Compute Average True Range over 'period' candles."""
    true_ranges = []
    for i in range(1, len(closes)):
        tr = max(
            highs[i] - lows[i],
            abs(highs[i] - closes[i-1]),
            abs(lows[i] - closes[i-1])
        )
        true_ranges.append(tr)
    return statistics.mean(true_ranges[-period:])

def volatility_adjusted_size(
    base_size_usdc: float,
    current_atr: float,
    baseline_atr: float,
    max_scale: float = 2.0,
    min_scale: float = 0.25
) -> float:
    """Scale position size inversely with volatility."""
    if current_atr <= 0: return base_size_usdc

    vol_ratio = baseline_atr / current_atr  # >1 = lower vol than baseline
    scale = max(min_scale, min(max_scale, vol_ratio))
    adjusted = base_size_usdc * scale

    print(f"  ATR ratio: {vol_ratio:.2f} | scale: {scale:.2f}x | size: ${adjusted:.2f}")
    return adjusted

# Example: baseline ATR is $80, current ATR spiked to $160 (2x volatility)
# Position should shrink by half
size = volatility_adjusted_size(
    base_size_usdc=50.00,
    current_atr=160.00,   # high vol day
    baseline_atr=80.00    # normal vol baseline
)
# ATR ratio: 0.50 | scale: 0.50x | size: $25.00

7. Risk Budget Framework

A risk budget defines how much capital can be "at risk" simultaneously across all open positions. Total open risk must never exceed the daily budget. Combined with Kelly sizing and ATR adjustment, this creates a three-layer defense against blowup.

risk_budget.py โ€” total portfolio risk budget
class RiskBudget:
    def __init__(self, total_capital: float, daily_risk_pct: float = 0.05):
        self.total_capital = total_capital
        self.daily_budget_usdc = total_capital * daily_risk_pct  # e.g. 5% = $50 on $1000
        self.consumed_usdc = 0.0
        self.open_positions = {}  # {pos_id: risk_usdc}

    def available_risk(self) -> float:
        return max(0, self.daily_budget_usdc - self.consumed_usdc)

    def allocate(self, pos_id: str, size_usdc: float, stop_pct: float) -> bool:
        """Request risk allocation for a new position."""
        risk_usdc = size_usdc * stop_pct  # max loss if stop triggered
        available = self.available_risk()

        if risk_usdc > available:
            print(f"[RB] Denied: risk ${risk_usdc:.2f} > available ${available:.2f}")
            return False

        self.open_positions[pos_id] = risk_usdc
        self.consumed_usdc += risk_usdc
        utilization = self.consumed_usdc / self.daily_budget_usdc * 100
        print(f"[RB] Allocated: ${risk_usdc:.2f} | Budget {utilization:.0f}% used")
        return True

    def release(self, pos_id: str) -> None:
        if pos_id in self.open_positions:
            released = self.open_positions.pop(pos_id)
            self.consumed_usdc -= released
            print(f"[RB] Released: ${released:.2f} back to budget")

    def status(self) -> dict:
        return {
            'budget': self.daily_budget_usdc,
            'consumed': self.consumed_usdc,
            'available': self.available_risk(),
            'utilization_pct': self.consumed_usdc / self.daily_budget_usdc * 100
        }

# $1000 capital, 5% daily risk budget = $50 max at-risk per day
rb = RiskBudget(total_capital=1000.0, daily_risk_pct=0.05)

# Try to open 3 positions
rb.allocate("pos1", size_usdc=100, stop_pct=0.05)  # $5 risk โ€” OK
rb.allocate("pos2", size_usdc=200, stop_pct=0.05)  # $10 risk โ€” OK
rb.allocate("pos3", size_usdc=700, stop_pct=0.05)  # $35 risk โ€” denied (over budget)
print(rb.status())

8. Emergency Shutdown Procedure

When all else fails, you need a way to close every open position immediately. This is your last line of defense โ€” triggered by external alert, human intervention, or an automated system detecting unusual behavior.

Critical: market orders only

Emergency shutdown must use market orders, not limit orders. Speed takes priority over slippage. A 0.5% worse fill is acceptable. Being stuck in a position during a flash crash is not.

emergency_shutdown.py โ€” close all positions immediately
import asyncio
import requests

async def emergency_shutdown(api_key: str, reason: str = "manual_trigger") -> dict:
    """Close ALL open positions immediately using market orders."""
    print(f"\n[EMERGENCY] *** SHUTDOWN INITIATED ***")
    print(f"[EMERGENCY] Reason: {reason}")

    headers = {"Authorization": f"Bearer {api_key}"}

    # Step 1: Get all open positions
    r = requests.get(f"{TRADING_API}/positions?status=open", headers=headers)
    positions = r.json().get('positions', [])
    print(f"[EMERGENCY] Found {len(positions)} open positions to close")

    # Step 2: Cancel all pending orders first (avoid partial fills)
    requests.delete(f"{TRADING_API}/orders/open", headers=headers)
    print(f"[EMERGENCY] Cancelled all pending orders")

    # Step 3: Close all positions in parallel (market orders)
    close_tasks = [
        close_position_market(pos['position_id'], api_key)
        for pos in positions
    ]
    results = await asyncio.gather(*close_tasks, return_exceptions=True)

    success_count = sum(1 for r in results if not isinstance(r, Exception))
    print(f"[EMERGENCY] Closed {success_count}/{len(positions)} positions")
    print(f"[EMERGENCY] Shutdown complete.\n")

    return {'positions_closed': success_count, 'total': len(positions), 'reason': reason}

async def close_position_market(pos_id: str, api_key: str) -> dict:
    r = requests.post(
        f"{TRADING_API}/positions/{pos_id}/close",
        headers={"Authorization": f"Bearer {api_key}"},
        json={"order_type": "market", "urgency": "immediate"}
    )
    return r.json()

# Trigger manually or via webhook
asyncio.run(emergency_shutdown(api_key=API_KEY, reason="drawdown_limit_exceeded"))

9. Monitoring and Alerts

Proactive alerts keep you (or your orchestrator agent) informed before limits are breached. Send webhook notifications when drawdown approaches 5%, 7%, and 10% โ€” not just at the final halt.

monitoring.py โ€” webhook alerts for risk thresholds
import requests
from dataclasses import dataclass
from typing import List

ALERT_WEBHOOK = "https://your-alert-endpoint.com/webhook"

@dataclass
class AlertThreshold:
    drawdown_pct: float
    severity: str       # "info" | "warning" | "critical"
    triggered: bool = False

class RiskMonitor:
    DEFAULT_THRESHOLDS = [
        AlertThreshold(0.03, "info"),       # -3%: heads up
        AlertThreshold(0.05, "warning"),    # -5%: tighten sizes
        AlertThreshold(0.07, "warning"),    # -7%: consider pausing
        AlertThreshold(0.10, "critical"),   # -10%: halt trading
    ]

    def __init__(self, session_capital: float, agent_id: str):
        self.session_capital = session_capital
        self.agent_id = agent_id
        self.thresholds = [AlertThreshold(t.drawdown_pct, t.severity)
                           for t in self.DEFAULT_THRESHOLDS]

    def check(self, current_capital: float) -> List[AlertThreshold]:
        drawdown = (self.session_capital - current_capital) / self.session_capital
        triggered_now = []

        for threshold in self.thresholds:
            if drawdown >= threshold.drawdown_pct and not threshold.triggered:
                threshold.triggered = True
                triggered_now.append(threshold)
                self.send_alert(threshold, drawdown, current_capital)

        return triggered_now

    def send_alert(self, t: AlertThreshold, drawdown: float, capital: float) -> None:
        payload = {
            "agent_id": self.agent_id,
            "severity": t.severity,
            "drawdown_pct": round(drawdown * 100, 2),
            "current_capital_usdc": round(capital, 2),
            "session_start_usdc": self.session_capital,
            "message": f"Drawdown alert: {drawdown*100:.1f}% | Action required: {t.severity}"
        }
        try:
            requests.post(ALERT_WEBHOOK, json=payload, timeout=5)
            print(f"[ALERT] {t.severity.upper()}: {drawdown*100:.1f}% drawdown โ€” webhook sent")
        except Exception as e:
            print(f"[ALERT] Failed to send webhook: {e}")

# Usage in main trading loop
monitor = RiskMonitor(session_capital=500.0, agent_id="trading-agent-001")
monitor.check(current_capital=460.0)  # 8% drawdown โ†’ warning alert fires

10. Conclusion

Risk management is not optional for autonomous trading agents. It is the difference between a system that compounds capital over time and one that burns through it in a single bad session. The controls in this guide, applied together, provide defense in depth:

Start with $1 free USDC

Test all these risk controls against the Purple Flea Trading API with free funds from the faucet. Build and validate your full risk stack before deploying real capital.

Ready to Build a Risk-Managed Trading Agent?

Get free USDC from the faucet, set up your agent on Purple Flea, and test your risk controls in a live environment.

Claim Free $1 USDC Trading API Docs Trading Bot Guide

Related: Building an Autonomous Trading Bot in 2026 ยท Agent Bankroll Management ยท Multi-Agent Coordination