1. Monitoring Architecture Overview
Autonomous trading agents present unique monitoring challenges. Unlike stateless web services, a trading agent maintains internal state (open positions, cash balance, strategy parameters) and its correctness is measured not by HTTP status codes alone but by financial performance over time.
A robust monitoring stack for agents has four layers:
- Health Layer — Is the agent alive and responding? Are its dependencies reachable?
- Performance Layer — Is it making money? What is the Sharpe ratio, drawdown, win rate?
- Behavioral Layer — Are its decisions consistent with its strategy? Anomaly detection flags drift.
- Operational Layer — Logs, traces, error rates, latency percentiles.
Each layer feeds into a central dashboard. Alerts route to Telegram or Slack based on severity. A written incident response playbook ensures humans can intervene quickly when automated recovery fails.
from dataclasses import dataclass, field
from typing import Optional
import asyncio, time, logging
@dataclass
class MonitoringConfig:
agent_id: str
api_base: str # e.g. "https://purpleflea.com"
api_key: str
telegram_token: Optional[str] = None
telegram_chat_id: Optional[str] = None
slack_webhook: Optional[str] = None
check_interval: int = 30 # seconds
anomaly_window: int = 100 # rolling returns window
anomaly_threshold: float = 3.0 # z-score threshold
max_drawdown_pct: float = 0.15 # 15% drawdown kills agent
@dataclass
class AgentSnapshot:
timestamp: float = field(default_factory=time.time)
balance: float = 0.0
open_positions: int = 0
pnl_24h: float = 0.0
win_rate: float = 0.0
sharpe: float = 0.0
max_drawdown: float = 0.0
api_latency_ms: float = 0.0
errors_1h: int = 0
alive: bool = True
2. Health Check Endpoints
Every trading agent should expose a /health endpoint that returns a structured JSON response. This enables orchestration platforms (PM2, Kubernetes, Docker Compose) to restart the agent automatically on failure.
A good health check is more than a ping. It validates:
- Database/cache connectivity
- External API reachability (exchange, price feeds)
- Wallet balance above minimum threshold
- Last trade timestamp not stale (strategy should be active)
- Memory and CPU within normal bounds
from aiohttp import web
import aiohttp, psutil, time, json
class HealthServer:
def __init__(self, agent, config):
self.agent = agent
self.config = config
self.start_time = time.time()
async def handle_health(self, request):
checks = {}
status = 200
# 1. Check exchange API reachability
try:
async with aiohttp.ClientSession() as s:
t0 = time.monotonic()
async with s.get(
f"{self.config.api_base}/api/health",
timeout=aiohttp.ClientTimeout(total=3)
) as r:
checks["exchange_api"] = {
"ok": r.status == 200,
"latency_ms": round((time.monotonic()-t0)*1000, 1)
}
except Exception as e:
checks["exchange_api"] = {"ok": False, "error": str(e)}
status = 503
# 2. Check wallet balance
balance = await self.agent.get_balance()
checks["wallet"] = {
"ok": balance > self.config.min_balance,
"balance": balance
}
if not checks["wallet"]["ok"]:
status = 503
# 3. Check last activity (strategy heartbeat)
last_action_age = time.time() - self.agent.last_action_ts
checks["strategy_heartbeat"] = {
"ok": last_action_age < 300, # stale after 5 min
"last_action_secs_ago": round(last_action_age, 1)
}
if not checks["strategy_heartbeat"]["ok"]:
status = 503
# 4. System resources
mem = psutil.virtual_memory()
checks["system"] = {
"cpu_pct": psutil.cpu_percent(interval=0.1),
"mem_pct": mem.percent,
"ok": mem.percent < 90
}
payload = {
"status": "ok" if status == 200 else "degraded",
"uptime_secs": round(time.time() - self.start_time),
"agent_id": self.config.agent_id,
"checks": checks,
"timestamp": time.time()
}
return web.Response(
text=json.dumps(payload, indent=2),
content_type="application/json",
status=status
)
async def start(self, port=8080):
app = web.Application()
app.router.add_get("/health", self.handle_health)
app.router.add_get("/metrics", self.handle_metrics)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", port)
await site.start()
print(f"Health server on port {port}")
3. Performance Metrics: Sharpe, Drawdown, Win Rate
Financial performance metrics require careful implementation. A few common pitfalls:
- Sharpe ratio must be annualized — multiply by
sqrt(periods_per_year) - Max drawdown is peak-to-trough on the equity curve, not individual trade loss
- Win rate alone is meaningless — a 90% win rate with bad risk/reward destroys capital
- Always track profit factor (gross profit / gross loss) alongside win rate
| Metric | Formula | Good Range | Action Threshold |
|---|---|---|---|
| Sharpe Ratio | (mean_return / std_return) * sqrt(N) | > 1.5 | < 0.5 — review strategy |
| Max Drawdown | max(peak - trough) / peak | < 15% | > 25% — pause agent |
| Win Rate | wins / total_trades | 45–65% | < 35% — review signals |
| Profit Factor | gross_profit / gross_loss | > 1.5 | < 1.0 — losing money |
| Calmar Ratio | annual_return / max_drawdown | > 1.0 | < 0.5 — poor risk-adj |
import numpy as np
from typing import List
class PerformanceMetrics:
def __init__(self, periods_per_year: int = 365 * 24):
# For hourly data on 24/7 crypto: 365*24 periods/year
self.periods_per_year = periods_per_year
def sharpe_ratio(self, returns: List[float], risk_free: float = 0.0) -> float:
arr = np.array(returns)
if len(arr) < 2:
return 0.0
excess = arr - risk_free / self.periods_per_year
std = np.std(excess, ddof=1)
if std == 0:
return 0.0
return float(np.mean(excess) / std * np.sqrt(self.periods_per_year))
def max_drawdown(self, equity_curve: List[float]) -> float:
"""Returns drawdown as a positive fraction (0.15 = 15%)."""
arr = np.array(equity_curve)
peak = np.maximum.accumulate(arr)
drawdown = (peak - arr) / peak
return float(np.max(drawdown))
def win_rate(self, trade_pnls: List[float]) -> float:
if not trade_pnls:
return 0.0
wins = sum(1 for p in trade_pnls if p > 0)
return wins / len(trade_pnls)
def profit_factor(self, trade_pnls: List[float]) -> float:
gross_profit = sum(p for p in trade_pnls if p > 0)
gross_loss = abs(sum(p for p in trade_pnls if p < 0))
if gross_loss == 0:
return float('inf') if gross_profit > 0 else 0.0
return gross_profit / gross_loss
def calmar_ratio(self, equity_curve: List[float]) -> float:
if len(equity_curve) < 2:
return 0.0
total_return = (equity_curve[-1] - equity_curve[0]) / equity_curve[0]
periods = len(equity_curve)
annual_return = (1 + total_return) ** (self.periods_per_year / periods) - 1
mdd = self.max_drawdown(equity_curve)
if mdd == 0:
return float('inf')
return annual_return / mdd
def summary(self, equity_curve: List[float], trade_pnls: List[float]) -> dict:
returns = [
(equity_curve[i] - equity_curve[i-1]) / equity_curve[i-1]
for i in range(1, len(equity_curve))
] if len(equity_curve) > 1 else []
return {
"sharpe": round(self.sharpe_ratio(returns), 3),
"max_drawdown_pct": round(self.max_drawdown(equity_curve) * 100, 2),
"win_rate_pct": round(self.win_rate(trade_pnls) * 100, 1),
"profit_factor": round(self.profit_factor(trade_pnls), 3),
"calmar_ratio": round(self.calmar_ratio(equity_curve), 3),
"total_trades": len(trade_pnls),
"total_pnl": round(sum(trade_pnls), 4),
}
4. Anomaly Detection with Z-Score on Returns
Anomaly detection catches two distinct failure modes:
- Sudden large losses — a single trade that dwarfs historical volatility, suggesting a bug, mis-sized position, or data corruption.
- Strategy drift — returns whose distribution shifts over time, meaning the market regime has changed and the agent's edge has eroded.
The z-score method is simple and interpretable: if the latest return is more than N standard deviations from the rolling mean, fire an alert. N=3 is the standard threshold for trading systems.
from collections import deque
import numpy as np
import time
class AnomalyDetector:
def __init__(self, window: int = 100, z_threshold: float = 3.0,
min_samples: int = 20):
self.window = window
self.z_threshold = z_threshold
self.min_samples = min_samples
self.returns: deque = deque(maxlen=window)
self.anomalies: list = []
def add_return(self, ret: float) -> dict:
self.returns.append(ret)
result = {
"return": ret,
"timestamp": time.time(),
"anomaly": False,
"z_score": None
}
if len(self.returns) < self.min_samples:
return result # not enough history
arr = np.array(self.returns)
mean = np.mean(arr[:-1]) # exclude current point from baseline
std = np.std(arr[:-1], ddof=1)
if std < 1e-10:
return result # no variance, skip
z = (ret - mean) / std
result["z_score"] = round(float(z), 3)
result["baseline_mean"] = round(float(mean), 6)
result["baseline_std"] = round(float(std), 6)
if abs(z) > self.z_threshold:
result["anomaly"] = True
result["direction"] = "loss" if z < 0 else "gain"
self.anomalies.append(result)
return result
def rolling_volatility(self) -> float:
if len(self.returns) < 2:
return 0.0
return float(np.std(list(self.returns), ddof=1))
def regime_change_score(self, recent_n: int = 20) -> float:
"""KL-divergence proxy: compare recent vs baseline volatility."""
if len(self.returns) < self.min_samples + recent_n:
return 0.0
all_r = list(self.returns)
baseline_std = np.std(all_r[:-recent_n], ddof=1)
recent_std = np.std(all_r[-recent_n:], ddof=1)
if baseline_std == 0:
return 0.0
return float(abs(recent_std - baseline_std) / baseline_std) # fraction change
5. PnL Attribution Dashboard
PnL attribution answers the question: where is the money coming from and going to? A full attribution breaks down returns by:
- Strategy — which sub-strategy generated each trade
- Asset — which token or market
- Time of day — are there intraday patterns?
- Fee drag — how much are fees eating into gross PnL?
- Slippage — difference between expected and actual fill price
from dataclasses import dataclass, field
from collections import defaultdict
from typing import List, Dict
import time
@dataclass
class Trade:
id: str
strategy: str
asset: str
side: str # "buy" or "sell"
size: float
entry_price: float
exit_price: float
entry_ts: float
exit_ts: float
fees: float = 0.0
slippage: float = 0.0
@property
def gross_pnl(self) -> float:
if self.side == "buy":
return (self.exit_price - self.entry_price) * self.size
return (self.entry_price - self.exit_price) * self.size
@property
def net_pnl(self) -> float:
return self.gross_pnl - self.fees - abs(self.slippage * self.size)
class PnLAttributor:
def __init__(self):
self.trades: List[Trade] = []
def add_trade(self, trade: Trade):
self.trades.append(trade)
def by_strategy(self) -> Dict[str, dict]:
result = defaultdict(lambda: {"gross": 0.0, "net": 0.0, "count": 0, "fees": 0.0})
for t in self.trades:
result[t.strategy]["gross"] += t.gross_pnl
result[t.strategy]["net"] += t.net_pnl
result[t.strategy]["count"] += 1
result[t.strategy]["fees"] += t.fees
return dict(result)
def by_asset(self) -> Dict[str, dict]:
result = defaultdict(lambda: {"gross": 0.0, "net": 0.0, "count": 0})
for t in self.trades:
result[t.asset]["gross"] += t.gross_pnl
result[t.asset]["net"] += t.net_pnl
result[t.asset]["count"] += 1
return dict(result)
def by_hour_of_day(self) -> Dict[int, float]:
result = defaultdict(float)
for t in self.trades:
import datetime
hour = datetime.datetime.utcfromtimestamp(t.entry_ts).hour
result[hour] += t.net_pnl
return dict(sorted(result.items()))
def fee_drag_pct(self) -> float:
total_fees = sum(t.fees for t in self.trades)
total_gross = sum(abs(t.gross_pnl) for t in self.trades)
if total_gross == 0:
return 0.0
return (total_fees / total_gross) * 100
def full_report(self) -> dict:
return {
"total_trades": len(self.trades),
"total_gross_pnl": round(sum(t.gross_pnl for t in self.trades), 4),
"total_net_pnl": round(sum(t.net_pnl for t in self.trades), 4),
"total_fees": round(sum(t.fees for t in self.trades), 4),
"fee_drag_pct": round(self.fee_drag_pct(), 2),
"by_strategy": self.by_strategy(),
"by_asset": self.by_asset(),
"by_hour": self.by_hour_of_day(),
}
6. Alert Routing: Telegram and Slack
Alerts must be actionable and tiered. Too many alerts cause alert fatigue; too few mean you miss critical events. Define three tiers:
- INFO — daily performance summary, milestone events. Logged only.
- WARN — Sharpe below threshold, drawdown approaching limit, API latency spike. Telegram message.
- CRITICAL — agent stopped, drawdown exceeded hard limit, anomaly detected. Telegram + Slack + SMS.
import aiohttp, logging
from enum import Enum
class AlertLevel(Enum):
INFO = "INFO"
WARN = "WARN"
CRITICAL = "CRITICAL"
class AlertRouter:
def __init__(self, config):
self.config = config
self.log = logging.getLogger("alerts")
self._last_sent: dict = {} # dedup: key -> last timestamp
self.dedup_window = 300 # 5 min dedup for same alert key
def _should_send(self, key: str) -> bool:
import time
last = self._last_sent.get(key, 0)
if time.time() - last > self.dedup_window:
self._last_sent[key] = time.time()
return True
return False
async def send(self, level: AlertLevel, title: str, body: str, key: str = None):
key = key or title
self.log.log(
logging.WARNING if level != AlertLevel.INFO else logging.INFO,
f"[{level.value}] {title}: {body}"
)
if level == AlertLevel.INFO:
return # only log INFO
if not self._should_send(key):
return # deduplicated
icon = {"WARN": "⚠️", "CRITICAL": "🚨"}[level.value]
msg = f"{icon} *{level.value} — {title}*\n{body}"
tasks = []
if self.config.telegram_token and self.config.telegram_chat_id:
tasks.append(self._send_telegram(msg))
if self.config.slack_webhook and level == AlertLevel.CRITICAL:
tasks.append(self._send_slack(title, body, level))
import asyncio
await asyncio.gather(*tasks, return_exceptions=True)
async def _send_telegram(self, text: str):
url = f"https://api.telegram.org/bot{self.config.telegram_token}/sendMessage"
async with aiohttp.ClientSession() as s:
await s.post(url, json={
"chat_id": self.config.telegram_chat_id,
"text": text,
"parse_mode": "Markdown"
})
async def _send_slack(self, title: str, body: str, level: AlertLevel):
color = "#EF4444" if level == AlertLevel.CRITICAL else "#EAB308"
async with aiohttp.ClientSession() as s:
await s.post(self.config.slack_webhook, json={
"attachments": [{
"color": color,
"title": title,
"text": body,
"footer": "Purple Flea Agent Monitor"
}]
})
7. Incident Response Playbook
A written playbook ensures consistent, fast responses when humans must intervene. Define runbooks for each alert type and store them in your operations wiki. Here is a minimal playbook structure:
Runbook: Agent Stopped (CRITICAL)
- Check PM2 status:
pm2 list | grep agent - Check logs for last error:
pm2 logs agent-name --lines 50 - If OOM: reduce position size, increase server RAM, restart
- If API key expired: rotate key, update env, restart
- If exchange down: wait for recovery, do not restart in loop
- Verify restart: check /health endpoint returns 200
Runbook: Drawdown Exceeded (CRITICAL)
- Pause agent immediately:
pm2 stop agent-name - Close all open positions via exchange UI
- Pull trade history and run attribution report
- Identify losing strategy or asset
- Adjust risk parameters (reduce size, tighten stops)
- Paper trade for 24h before re-enabling
Runbook: Anomaly Detected (WARN)
- Review the flagged trade: size, asset, entry/exit prices
- Check if price data feed had a spike or gap
- Verify the trade was actually executed at the flagged price
- If data issue: agent continues, add data quality check
- If strategy issue: pause and review signal logic
8. Metrics Exposition for Grafana/Prometheus
Expose agent metrics in Prometheus text format so any Grafana dashboard can visualize them. This enables multi-agent dashboards without per-agent custom tooling.
class PrometheusExporter:
"""Exposes agent metrics in Prometheus text exposition format."""
def __init__(self, agent_id: str):
self.agent_id = agent_id
self._metrics: dict = {}
def set(self, name: str, value: float, labels: dict = None):
label_str = ""
if labels:
pairs = ",".join(f'{k}="{v}"' for k, v in labels.items())
label_str = f"{{{pairs}}}"
self._metrics[f"{name}{label_str}"] = value
def render(self) -> str:
lines = []
base_labels = f'agent_id="{self.agent_id}"'
for key, val in self._metrics.items():
# Insert base label
if key.endswith("}"):
key = key[:-1] + f",{base_labels}" + "}"
else:
key = key + "{" + base_labels + "}"
lines.append(f"purpleflea_{key} {val}")
return "\n".join(lines)
# Usage:
# exp = PrometheusExporter("my-arb-agent")
# exp.set("balance_usd", 1234.56)
# exp.set("pnl_24h_usd", 45.20)
# exp.set("sharpe_ratio", 1.87)
# exp.set("max_drawdown_pct", 4.2)
# exp.set("win_rate_pct", 58.3)
# exp.set("open_positions", 2)
# exp.set("api_latency_ms", 87.0, labels={"endpoint": "place_order"})
# print(exp.render())
9. MonitoringAgent: Full Async Implementation
Putting it all together: a self-contained MonitoringAgent that runs async health checks on a schedule, detects anomalies, routes alerts, and exposes Prometheus metrics. Run this as a sidecar alongside your trading agent.
import asyncio, aiohttp, time, logging
from typing import List
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
class MonitoringAgent:
def __init__(self, config: MonitoringConfig):
self.config = config
self.metrics = PerformanceMetrics()
self.anomaly = AnomalyDetector(
window=config.anomaly_window,
z_threshold=config.anomaly_threshold
)
self.alerts = AlertRouter(config)
self.exporter = PrometheusExporter(config.agent_id)
self.equity_curve: List[float] = []
self.trade_pnls: List[float] = []
self.log = logging.getLogger(f"monitor.{config.agent_id}")
async def fetch_snapshot(self) -> AgentSnapshot:
"""Poll the trading agent's /health and /metrics endpoints."""
snap = AgentSnapshot()
try:
async with aiohttp.ClientSession() as s:
t0 = time.monotonic()
async with s.get(
f"{self.config.api_base}/metrics",
headers={"Authorization": f"Bearer {self.config.api_key}"},
timeout=aiohttp.ClientTimeout(total=5)
) as r:
snap.api_latency_ms = round((time.monotonic()-t0)*1000, 1)
if r.status == 200:
data = await r.json()
snap.balance = data.get("balance", 0)
snap.pnl_24h = data.get("pnl_24h", 0)
snap.open_positions = data.get("open_positions", 0)
else:
snap.alive = False
except Exception as e:
self.log.error(f"Failed to fetch snapshot: {e}")
snap.alive = False
return snap
async def check_and_alert(self, snap: AgentSnapshot):
if not snap.alive:
await self.alerts.send(
AlertLevel.CRITICAL,
"Agent Unreachable",
f"Agent {self.config.agent_id} failed health check.",
key="agent_down"
)
return
# Update equity curve for drawdown calculation
if snap.balance > 0:
self.equity_curve.append(snap.balance)
# Check drawdown
if len(self.equity_curve) > 10:
summary = self.metrics.summary(self.equity_curve, self.trade_pnls)
mdd = summary["max_drawdown_pct"] / 100
if mdd > self.config.max_drawdown_pct:
await self.alerts.send(
AlertLevel.CRITICAL,
"Max Drawdown Exceeded",
f"Drawdown: {mdd*100:.1f}% > limit {self.config.max_drawdown_pct*100:.0f}%",
key="max_drawdown"
)
# Update Prometheus metrics
self.exporter.set("balance_usd", snap.balance)
self.exporter.set("pnl_24h_usd", snap.pnl_24h)
self.exporter.set("sharpe_ratio", summary["sharpe"])
self.exporter.set("max_drawdown_pct", summary["max_drawdown_pct"])
self.exporter.set("api_latency_ms", snap.api_latency_ms)
async def run(self):
self.log.info(f"Monitoring agent started for {self.config.agent_id}")
while True:
try:
snap = await self.fetch_snapshot()
await self.check_and_alert(snap)
self.log.info(
f"Snapshot OK | balance={snap.balance:.2f} "
f"latency={snap.api_latency_ms}ms alive={snap.alive}"
)
except Exception as e:
self.log.error(f"Monitor loop error: {e}")
await asyncio.sleep(self.config.check_interval)
# Entry point
if __name__ == "__main__":
import os
cfg = MonitoringConfig(
agent_id=os.environ["AGENT_ID"],
api_base="https://purpleflea.com",
api_key=os.environ["PURPLEFLEA_API_KEY"],
telegram_token=os.environ.get("TELEGRAM_TOKEN"),
telegram_chat_id=os.environ.get("TELEGRAM_CHAT_ID"),
slack_webhook=os.environ.get("SLACK_WEBHOOK"),
)
asyncio.run(MonitoringAgent(cfg).run())
Start monitoring your agent today
Purple Flea provides financial infrastructure for AI agents — casino, escrow, faucet, wallet, perpetuals, and domains. Get your agent running in minutes.
Explore Purple Flea