Guide

Production Monitoring for AI Trading Agents

How to monitor autonomous trading agents in production — health checks, performance dashboards, anomaly detection, PnL tracking, and incident response playbooks.

Purple Flea March 6, 2026 18 min read
Key Metrics
12+
Signals per agent
Alert MTTR
<5 min
With proper setup
Anomaly Detection
Z-Score
On rolling returns

1. Monitoring Architecture Overview

Autonomous trading agents present unique monitoring challenges. Unlike stateless web services, a trading agent maintains internal state (open positions, cash balance, strategy parameters) and its correctness is measured not by HTTP status codes alone but by financial performance over time.

A robust monitoring stack for agents has four layers:

Each layer feeds into a central dashboard. Alerts route to Telegram or Slack based on severity. A written incident response playbook ensures humans can intervene quickly when automated recovery fails.

Architecture tip: Use a time-series database (InfluxDB, Prometheus, or TimescaleDB) for financial metrics — they support the rolling window aggregations (30m, 1h, 24h) that trading analysis requires.
monitoring_stack.pyPython
from dataclasses import dataclass, field
from typing import Optional
import asyncio, time, logging

@dataclass
class MonitoringConfig:
    agent_id: str
    api_base: str          # e.g. "https://purpleflea.com"
    api_key: str
    telegram_token: Optional[str] = None
    telegram_chat_id: Optional[str] = None
    slack_webhook: Optional[str] = None
    check_interval: int = 30        # seconds
    anomaly_window: int = 100       # rolling returns window
    anomaly_threshold: float = 3.0  # z-score threshold
    max_drawdown_pct: float = 0.15  # 15% drawdown kills agent

@dataclass
class AgentSnapshot:
    timestamp: float = field(default_factory=time.time)
    balance: float = 0.0
    open_positions: int = 0
    pnl_24h: float = 0.0
    win_rate: float = 0.0
    sharpe: float = 0.0
    max_drawdown: float = 0.0
    api_latency_ms: float = 0.0
    errors_1h: int = 0
    alive: bool = True

2. Health Check Endpoints

Every trading agent should expose a /health endpoint that returns a structured JSON response. This enables orchestration platforms (PM2, Kubernetes, Docker Compose) to restart the agent automatically on failure.

A good health check is more than a ping. It validates:

health_server.pyPython
from aiohttp import web
import aiohttp, psutil, time, json

class HealthServer:
    def __init__(self, agent, config):
        self.agent = agent
        self.config = config
        self.start_time = time.time()

    async def handle_health(self, request):
        checks = {}
        status = 200

        # 1. Check exchange API reachability
        try:
            async with aiohttp.ClientSession() as s:
                t0 = time.monotonic()
                async with s.get(
                    f"{self.config.api_base}/api/health",
                    timeout=aiohttp.ClientTimeout(total=3)
                ) as r:
                    checks["exchange_api"] = {
                        "ok": r.status == 200,
                        "latency_ms": round((time.monotonic()-t0)*1000, 1)
                    }
        except Exception as e:
            checks["exchange_api"] = {"ok": False, "error": str(e)}
            status = 503

        # 2. Check wallet balance
        balance = await self.agent.get_balance()
        checks["wallet"] = {
            "ok": balance > self.config.min_balance,
            "balance": balance
        }
        if not checks["wallet"]["ok"]:
            status = 503

        # 3. Check last activity (strategy heartbeat)
        last_action_age = time.time() - self.agent.last_action_ts
        checks["strategy_heartbeat"] = {
            "ok": last_action_age < 300,   # stale after 5 min
            "last_action_secs_ago": round(last_action_age, 1)
        }
        if not checks["strategy_heartbeat"]["ok"]:
            status = 503

        # 4. System resources
        mem = psutil.virtual_memory()
        checks["system"] = {
            "cpu_pct": psutil.cpu_percent(interval=0.1),
            "mem_pct": mem.percent,
            "ok": mem.percent < 90
        }

        payload = {
            "status": "ok" if status == 200 else "degraded",
            "uptime_secs": round(time.time() - self.start_time),
            "agent_id": self.config.agent_id,
            "checks": checks,
            "timestamp": time.time()
        }
        return web.Response(
            text=json.dumps(payload, indent=2),
            content_type="application/json",
            status=status
        )

    async def start(self, port=8080):
        app = web.Application()
        app.router.add_get("/health", self.handle_health)
        app.router.add_get("/metrics", self.handle_metrics)
        runner = web.AppRunner(app)
        await runner.setup()
        site = web.TCPSite(runner, "0.0.0.0", port)
        await site.start()
        print(f"Health server on port {port}")

3. Performance Metrics: Sharpe, Drawdown, Win Rate

Financial performance metrics require careful implementation. A few common pitfalls:

MetricFormulaGood RangeAction Threshold
Sharpe Ratio(mean_return / std_return) * sqrt(N)> 1.5< 0.5 — review strategy
Max Drawdownmax(peak - trough) / peak< 15%> 25% — pause agent
Win Ratewins / total_trades45–65%< 35% — review signals
Profit Factorgross_profit / gross_loss> 1.5< 1.0 — losing money
Calmar Ratioannual_return / max_drawdown> 1.0< 0.5 — poor risk-adj
performance_metrics.pyPython
import numpy as np
from typing import List

class PerformanceMetrics:
    def __init__(self, periods_per_year: int = 365 * 24):
        # For hourly data on 24/7 crypto: 365*24 periods/year
        self.periods_per_year = periods_per_year

    def sharpe_ratio(self, returns: List[float], risk_free: float = 0.0) -> float:
        arr = np.array(returns)
        if len(arr) < 2:
            return 0.0
        excess = arr - risk_free / self.periods_per_year
        std = np.std(excess, ddof=1)
        if std == 0:
            return 0.0
        return float(np.mean(excess) / std * np.sqrt(self.periods_per_year))

    def max_drawdown(self, equity_curve: List[float]) -> float:
        """Returns drawdown as a positive fraction (0.15 = 15%)."""
        arr = np.array(equity_curve)
        peak = np.maximum.accumulate(arr)
        drawdown = (peak - arr) / peak
        return float(np.max(drawdown))

    def win_rate(self, trade_pnls: List[float]) -> float:
        if not trade_pnls:
            return 0.0
        wins = sum(1 for p in trade_pnls if p > 0)
        return wins / len(trade_pnls)

    def profit_factor(self, trade_pnls: List[float]) -> float:
        gross_profit = sum(p for p in trade_pnls if p > 0)
        gross_loss = abs(sum(p for p in trade_pnls if p < 0))
        if gross_loss == 0:
            return float('inf') if gross_profit > 0 else 0.0
        return gross_profit / gross_loss

    def calmar_ratio(self, equity_curve: List[float]) -> float:
        if len(equity_curve) < 2:
            return 0.0
        total_return = (equity_curve[-1] - equity_curve[0]) / equity_curve[0]
        periods = len(equity_curve)
        annual_return = (1 + total_return) ** (self.periods_per_year / periods) - 1
        mdd = self.max_drawdown(equity_curve)
        if mdd == 0:
            return float('inf')
        return annual_return / mdd

    def summary(self, equity_curve: List[float], trade_pnls: List[float]) -> dict:
        returns = [
            (equity_curve[i] - equity_curve[i-1]) / equity_curve[i-1]
            for i in range(1, len(equity_curve))
        ] if len(equity_curve) > 1 else []
        return {
            "sharpe": round(self.sharpe_ratio(returns), 3),
            "max_drawdown_pct": round(self.max_drawdown(equity_curve) * 100, 2),
            "win_rate_pct": round(self.win_rate(trade_pnls) * 100, 1),
            "profit_factor": round(self.profit_factor(trade_pnls), 3),
            "calmar_ratio": round(self.calmar_ratio(equity_curve), 3),
            "total_trades": len(trade_pnls),
            "total_pnl": round(sum(trade_pnls), 4),
        }

4. Anomaly Detection with Z-Score on Returns

Anomaly detection catches two distinct failure modes:

  1. Sudden large losses — a single trade that dwarfs historical volatility, suggesting a bug, mis-sized position, or data corruption.
  2. Strategy drift — returns whose distribution shifts over time, meaning the market regime has changed and the agent's edge has eroded.

The z-score method is simple and interpretable: if the latest return is more than N standard deviations from the rolling mean, fire an alert. N=3 is the standard threshold for trading systems.

anomaly_detector.pyPython
from collections import deque
import numpy as np
import time

class AnomalyDetector:
    def __init__(self, window: int = 100, z_threshold: float = 3.0,
                 min_samples: int = 20):
        self.window = window
        self.z_threshold = z_threshold
        self.min_samples = min_samples
        self.returns: deque = deque(maxlen=window)
        self.anomalies: list = []

    def add_return(self, ret: float) -> dict:
        self.returns.append(ret)
        result = {
            "return": ret,
            "timestamp": time.time(),
            "anomaly": False,
            "z_score": None
        }
        if len(self.returns) < self.min_samples:
            return result  # not enough history

        arr = np.array(self.returns)
        mean = np.mean(arr[:-1])   # exclude current point from baseline
        std = np.std(arr[:-1], ddof=1)

        if std < 1e-10:
            return result  # no variance, skip

        z = (ret - mean) / std
        result["z_score"] = round(float(z), 3)
        result["baseline_mean"] = round(float(mean), 6)
        result["baseline_std"] = round(float(std), 6)

        if abs(z) > self.z_threshold:
            result["anomaly"] = True
            result["direction"] = "loss" if z < 0 else "gain"
            self.anomalies.append(result)

        return result

    def rolling_volatility(self) -> float:
        if len(self.returns) < 2:
            return 0.0
        return float(np.std(list(self.returns), ddof=1))

    def regime_change_score(self, recent_n: int = 20) -> float:
        """KL-divergence proxy: compare recent vs baseline volatility."""
        if len(self.returns) < self.min_samples + recent_n:
            return 0.0
        all_r = list(self.returns)
        baseline_std = np.std(all_r[:-recent_n], ddof=1)
        recent_std = np.std(all_r[-recent_n:], ddof=1)
        if baseline_std == 0:
            return 0.0
        return float(abs(recent_std - baseline_std) / baseline_std)  # fraction change
Warning: Z-score anomaly detection assumes approximately normal returns. Crypto returns have fat tails — consider supplementing with an IQR-based detector or a simple percentile threshold for highly skewed return distributions.

5. PnL Attribution Dashboard

PnL attribution answers the question: where is the money coming from and going to? A full attribution breaks down returns by:

pnl_attribution.pyPython
from dataclasses import dataclass, field
from collections import defaultdict
from typing import List, Dict
import time

@dataclass
class Trade:
    id: str
    strategy: str
    asset: str
    side: str          # "buy" or "sell"
    size: float
    entry_price: float
    exit_price: float
    entry_ts: float
    exit_ts: float
    fees: float = 0.0
    slippage: float = 0.0

    @property
    def gross_pnl(self) -> float:
        if self.side == "buy":
            return (self.exit_price - self.entry_price) * self.size
        return (self.entry_price - self.exit_price) * self.size

    @property
    def net_pnl(self) -> float:
        return self.gross_pnl - self.fees - abs(self.slippage * self.size)

class PnLAttributor:
    def __init__(self):
        self.trades: List[Trade] = []

    def add_trade(self, trade: Trade):
        self.trades.append(trade)

    def by_strategy(self) -> Dict[str, dict]:
        result = defaultdict(lambda: {"gross": 0.0, "net": 0.0, "count": 0, "fees": 0.0})
        for t in self.trades:
            result[t.strategy]["gross"] += t.gross_pnl
            result[t.strategy]["net"] += t.net_pnl
            result[t.strategy]["count"] += 1
            result[t.strategy]["fees"] += t.fees
        return dict(result)

    def by_asset(self) -> Dict[str, dict]:
        result = defaultdict(lambda: {"gross": 0.0, "net": 0.0, "count": 0})
        for t in self.trades:
            result[t.asset]["gross"] += t.gross_pnl
            result[t.asset]["net"] += t.net_pnl
            result[t.asset]["count"] += 1
        return dict(result)

    def by_hour_of_day(self) -> Dict[int, float]:
        result = defaultdict(float)
        for t in self.trades:
            import datetime
            hour = datetime.datetime.utcfromtimestamp(t.entry_ts).hour
            result[hour] += t.net_pnl
        return dict(sorted(result.items()))

    def fee_drag_pct(self) -> float:
        total_fees = sum(t.fees for t in self.trades)
        total_gross = sum(abs(t.gross_pnl) for t in self.trades)
        if total_gross == 0:
            return 0.0
        return (total_fees / total_gross) * 100

    def full_report(self) -> dict:
        return {
            "total_trades": len(self.trades),
            "total_gross_pnl": round(sum(t.gross_pnl for t in self.trades), 4),
            "total_net_pnl": round(sum(t.net_pnl for t in self.trades), 4),
            "total_fees": round(sum(t.fees for t in self.trades), 4),
            "fee_drag_pct": round(self.fee_drag_pct(), 2),
            "by_strategy": self.by_strategy(),
            "by_asset": self.by_asset(),
            "by_hour": self.by_hour_of_day(),
        }

6. Alert Routing: Telegram and Slack

Alerts must be actionable and tiered. Too many alerts cause alert fatigue; too few mean you miss critical events. Define three tiers:

alert_router.pyPython
import aiohttp, logging
from enum import Enum

class AlertLevel(Enum):
    INFO = "INFO"
    WARN = "WARN"
    CRITICAL = "CRITICAL"

class AlertRouter:
    def __init__(self, config):
        self.config = config
        self.log = logging.getLogger("alerts")
        self._last_sent: dict = {}   # dedup: key -> last timestamp
        self.dedup_window = 300      # 5 min dedup for same alert key

    def _should_send(self, key: str) -> bool:
        import time
        last = self._last_sent.get(key, 0)
        if time.time() - last > self.dedup_window:
            self._last_sent[key] = time.time()
            return True
        return False

    async def send(self, level: AlertLevel, title: str, body: str, key: str = None):
        key = key or title
        self.log.log(
            logging.WARNING if level != AlertLevel.INFO else logging.INFO,
            f"[{level.value}] {title}: {body}"
        )
        if level == AlertLevel.INFO:
            return   # only log INFO

        if not self._should_send(key):
            return   # deduplicated

        icon = {"WARN": "⚠️", "CRITICAL": "🚨"}[level.value]
        msg = f"{icon} *{level.value} — {title}*\n{body}"

        tasks = []
        if self.config.telegram_token and self.config.telegram_chat_id:
            tasks.append(self._send_telegram(msg))
        if self.config.slack_webhook and level == AlertLevel.CRITICAL:
            tasks.append(self._send_slack(title, body, level))

        import asyncio
        await asyncio.gather(*tasks, return_exceptions=True)

    async def _send_telegram(self, text: str):
        url = f"https://api.telegram.org/bot{self.config.telegram_token}/sendMessage"
        async with aiohttp.ClientSession() as s:
            await s.post(url, json={
                "chat_id": self.config.telegram_chat_id,
                "text": text,
                "parse_mode": "Markdown"
            })

    async def _send_slack(self, title: str, body: str, level: AlertLevel):
        color = "#EF4444" if level == AlertLevel.CRITICAL else "#EAB308"
        async with aiohttp.ClientSession() as s:
            await s.post(self.config.slack_webhook, json={
                "attachments": [{
                    "color": color,
                    "title": title,
                    "text": body,
                    "footer": "Purple Flea Agent Monitor"
                }]
            })

7. Incident Response Playbook

A written playbook ensures consistent, fast responses when humans must intervene. Define runbooks for each alert type and store them in your operations wiki. Here is a minimal playbook structure:

Runbook: Agent Stopped (CRITICAL)

  1. Check PM2 status: pm2 list | grep agent
  2. Check logs for last error: pm2 logs agent-name --lines 50
  3. If OOM: reduce position size, increase server RAM, restart
  4. If API key expired: rotate key, update env, restart
  5. If exchange down: wait for recovery, do not restart in loop
  6. Verify restart: check /health endpoint returns 200

Runbook: Drawdown Exceeded (CRITICAL)

  1. Pause agent immediately: pm2 stop agent-name
  2. Close all open positions via exchange UI
  3. Pull trade history and run attribution report
  4. Identify losing strategy or asset
  5. Adjust risk parameters (reduce size, tighten stops)
  6. Paper trade for 24h before re-enabling

Runbook: Anomaly Detected (WARN)

  1. Review the flagged trade: size, asset, entry/exit prices
  2. Check if price data feed had a spike or gap
  3. Verify the trade was actually executed at the flagged price
  4. If data issue: agent continues, add data quality check
  5. If strategy issue: pause and review signal logic

8. Metrics Exposition for Grafana/Prometheus

Expose agent metrics in Prometheus text format so any Grafana dashboard can visualize them. This enables multi-agent dashboards without per-agent custom tooling.

metrics_exporter.pyPython
class PrometheusExporter:
    """Exposes agent metrics in Prometheus text exposition format."""

    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self._metrics: dict = {}

    def set(self, name: str, value: float, labels: dict = None):
        label_str = ""
        if labels:
            pairs = ",".join(f'{k}="{v}"' for k, v in labels.items())
            label_str = f"{{{pairs}}}"
        self._metrics[f"{name}{label_str}"] = value

    def render(self) -> str:
        lines = []
        base_labels = f'agent_id="{self.agent_id}"'
        for key, val in self._metrics.items():
            # Insert base label
            if key.endswith("}"):
                key = key[:-1] + f",{base_labels}" + "}"
            else:
                key = key + "{" + base_labels + "}"
            lines.append(f"purpleflea_{key} {val}")
        return "\n".join(lines)

# Usage:
# exp = PrometheusExporter("my-arb-agent")
# exp.set("balance_usd", 1234.56)
# exp.set("pnl_24h_usd", 45.20)
# exp.set("sharpe_ratio", 1.87)
# exp.set("max_drawdown_pct", 4.2)
# exp.set("win_rate_pct", 58.3)
# exp.set("open_positions", 2)
# exp.set("api_latency_ms", 87.0, labels={"endpoint": "place_order"})
# print(exp.render())

9. MonitoringAgent: Full Async Implementation

Putting it all together: a self-contained MonitoringAgent that runs async health checks on a schedule, detects anomalies, routes alerts, and exposes Prometheus metrics. Run this as a sidecar alongside your trading agent.

monitoring_agent.pyPython
import asyncio, aiohttp, time, logging
from typing import List

logging.basicConfig(level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")

class MonitoringAgent:
    def __init__(self, config: MonitoringConfig):
        self.config = config
        self.metrics = PerformanceMetrics()
        self.anomaly = AnomalyDetector(
            window=config.anomaly_window,
            z_threshold=config.anomaly_threshold
        )
        self.alerts = AlertRouter(config)
        self.exporter = PrometheusExporter(config.agent_id)
        self.equity_curve: List[float] = []
        self.trade_pnls: List[float] = []
        self.log = logging.getLogger(f"monitor.{config.agent_id}")

    async def fetch_snapshot(self) -> AgentSnapshot:
        """Poll the trading agent's /health and /metrics endpoints."""
        snap = AgentSnapshot()
        try:
            async with aiohttp.ClientSession() as s:
                t0 = time.monotonic()
                async with s.get(
                    f"{self.config.api_base}/metrics",
                    headers={"Authorization": f"Bearer {self.config.api_key}"},
                    timeout=aiohttp.ClientTimeout(total=5)
                ) as r:
                    snap.api_latency_ms = round((time.monotonic()-t0)*1000, 1)
                    if r.status == 200:
                        data = await r.json()
                        snap.balance = data.get("balance", 0)
                        snap.pnl_24h = data.get("pnl_24h", 0)
                        snap.open_positions = data.get("open_positions", 0)
                    else:
                        snap.alive = False
        except Exception as e:
            self.log.error(f"Failed to fetch snapshot: {e}")
            snap.alive = False
        return snap

    async def check_and_alert(self, snap: AgentSnapshot):
        if not snap.alive:
            await self.alerts.send(
                AlertLevel.CRITICAL,
                "Agent Unreachable",
                f"Agent {self.config.agent_id} failed health check.",
                key="agent_down"
            )
            return

        # Update equity curve for drawdown calculation
        if snap.balance > 0:
            self.equity_curve.append(snap.balance)

        # Check drawdown
        if len(self.equity_curve) > 10:
            summary = self.metrics.summary(self.equity_curve, self.trade_pnls)
            mdd = summary["max_drawdown_pct"] / 100

            if mdd > self.config.max_drawdown_pct:
                await self.alerts.send(
                    AlertLevel.CRITICAL,
                    "Max Drawdown Exceeded",
                    f"Drawdown: {mdd*100:.1f}% > limit {self.config.max_drawdown_pct*100:.0f}%",
                    key="max_drawdown"
                )

            # Update Prometheus metrics
            self.exporter.set("balance_usd", snap.balance)
            self.exporter.set("pnl_24h_usd", snap.pnl_24h)
            self.exporter.set("sharpe_ratio", summary["sharpe"])
            self.exporter.set("max_drawdown_pct", summary["max_drawdown_pct"])
            self.exporter.set("api_latency_ms", snap.api_latency_ms)

    async def run(self):
        self.log.info(f"Monitoring agent started for {self.config.agent_id}")
        while True:
            try:
                snap = await self.fetch_snapshot()
                await self.check_and_alert(snap)
                self.log.info(
                    f"Snapshot OK | balance={snap.balance:.2f} "
                    f"latency={snap.api_latency_ms}ms alive={snap.alive}"
                )
            except Exception as e:
                self.log.error(f"Monitor loop error: {e}")
            await asyncio.sleep(self.config.check_interval)

# Entry point
if __name__ == "__main__":
    import os
    cfg = MonitoringConfig(
        agent_id=os.environ["AGENT_ID"],
        api_base="https://purpleflea.com",
        api_key=os.environ["PURPLEFLEA_API_KEY"],
        telegram_token=os.environ.get("TELEGRAM_TOKEN"),
        telegram_chat_id=os.environ.get("TELEGRAM_CHAT_ID"),
        slack_webhook=os.environ.get("SLACK_WEBHOOK"),
    )
    asyncio.run(MonitoringAgent(cfg).run())

Start monitoring your agent today

Purple Flea provides financial infrastructure for AI agents — casino, escrow, faucet, wallet, perpetuals, and domains. Get your agent running in minutes.

Explore Purple Flea