What to Monitor in a Financial Agent
Most monitoring guides focus on infrastructure: CPU, memory, uptime. For an AI agent operating financial APIs, those signals matter less than the behavioral signals: is the agent making decisions within expected parameters? Is it losing money faster than expected? Has it stopped trading entirely?
Financial agents need three tiers of monitoring: infrastructure health (is the process running?), API health (are requests succeeding?), and financial health (is the agent behaving as intended?).
A process can be "healthy" (running, responding to health checks) but behaving incorrectly — betting too aggressively, holding positions too long, or making no decisions at all due to a logic bug. Behavioral metrics are more valuable than infrastructure metrics for autonomous agents.
Metrics Collection with Prometheus-Style Counters
You do not need a full Prometheus deployment to get value from structured metrics. A lightweight in-process metrics registry gives you the same semantics (counters, gauges, histograms) without the infrastructure overhead. The data can be exported to Prometheus, StatsD, or a simple HTTP endpoint for scraping.
import time
import threading
from collections import defaultdict
from typing import Dict, List, Optional
from dataclasses import dataclass, field
@dataclass
class Counter:
"""Monotonically increasing counter."""
name: str
help_text: str
labels: Dict[str, str] = field(default_factory=dict)
_value: float = 0.0
_lock: threading.Lock = field(default_factory=threading.Lock)
def inc(self, amount: float = 1.0):
with self._lock:
self._value += amount
@property
def value(self) -> float:
return self._value
@dataclass
class Gauge:
"""Value that can go up or down."""
name: str
help_text: str
labels: Dict[str, str] = field(default_factory=dict)
_value: float = 0.0
_lock: threading.Lock = field(default_factory=threading.Lock)
def set(self, value: float):
with self._lock:
self._value = value
def inc(self, amount: float = 1.0):
with self._lock:
self._value += amount
def dec(self, amount: float = 1.0):
with self._lock:
self._value -= amount
@property
def value(self) -> float:
return self._value
class Histogram:
"""Track distribution of values (latency, amounts)."""
DEFAULT_BUCKETS = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
def __init__(self, name: str, help_text: str, buckets: List[float] = None):
self.name = name
self.help_text = help_text
self.buckets = sorted(buckets or self.DEFAULT_BUCKETS)
self._counts = defaultdict(int)
self._sum = 0.0
self._count = 0
self._lock = threading.Lock()
def observe(self, value: float):
with self._lock:
self._sum += value
self._count += 1
for bucket in self.buckets:
if value <= bucket:
self._counts[bucket] += 1
def percentile(self, p: float) -> float:
"""Estimate percentile from bucket counts."""
if self._count == 0:
return 0.0
target = self._count * (p / 100)
for bucket in self.buckets:
if self._counts[bucket] >= target:
return bucket
return self.buckets[-1]
class AgentMetrics:
"""Central metrics registry for a Purple Flea agent."""
def __init__(self, agent_id: str):
self.agent_id = agent_id
# Financial metrics
self.balance_usdc = Gauge("agent_balance_usdc", "Current USDC wallet balance")
self.open_positions = Gauge("agent_open_positions", "Number of open trading positions")
self.position_value = Gauge("agent_position_value_usdc", "Total value of open positions")
# Activity counters
self.bets_total = Counter("agent_bets_total", "Total casino bets placed")
self.bets_won = Counter("agent_bets_won", "Total casino bets won")
self.bets_lost = Counter("agent_bets_lost", "Total casino bets lost")
self.trades_total = Counter("agent_trades_total", "Total trades executed")
self.escrows_created = Counter("agent_escrows_created", "Total escrows created")
self.escrows_released = Counter("agent_escrows_released", "Total escrows released")
# API health
self.api_requests_total = Counter("agent_api_requests_total", "Total API requests")
self.api_errors_total = Counter("agent_api_errors_total", "Total API errors")
self.api_latency = Histogram("agent_api_latency_seconds", "API request latency")
# Lifecycle
self.last_heartbeat = Gauge("agent_last_heartbeat_ts", "Unix timestamp of last activity")
self.uptime_seconds = Gauge("agent_uptime_seconds", "Agent uptime")
self._start_time = time.time()
def heartbeat(self):
"""Record that the agent is alive and active."""
now = time.time()
self.last_heartbeat.set(now)
self.uptime_seconds.set(now - self._start_time)
def record_bet(self, won: bool, payout: float):
self.bets_total.inc()
self.heartbeat()
if won:
self.bets_won.inc()
else:
self.bets_lost.inc()
def win_rate(self) -> float:
total = self.bets_total.value
if total == 0:
return 0.0
return self.bets_won.value / total
def to_prometheus_text(self) -> str:
"""Export in Prometheus text exposition format."""
lines = []
for name, metric in vars(self).items():
if isinstance(metric, (Counter, Gauge)):
lines.append(f"# HELP {metric.name} {metric.help_text}")
lines.append(f"# TYPE {metric.name} {'counter' if isinstance(metric, Counter) else 'gauge'}")
lines.append(f'{metric.name}{{agent_id="{self.agent_id}"}} {metric.value}')
return "\n".join(lines) + "\n"
# Global metrics instance
metrics = AgentMetrics(agent_id=os.environ.get("AGENT_ID", "default"))
Instrumenting Purple Flea API Calls
Wrap all API calls with automatic metrics recording using a context manager or decorator:
from contextlib import asynccontextmanager
@asynccontextmanager
async def track_api_call(operation: str):
"""Context manager that records latency and errors for any API call."""
start = time.monotonic()
metrics.api_requests_total.inc()
try:
yield
elapsed = time.monotonic() - start
metrics.api_latency.observe(elapsed)
except httpx.HTTPStatusError as e:
metrics.api_errors_total.inc()
elapsed = time.monotonic() - start
metrics.api_latency.observe(elapsed)
raise
except Exception:
metrics.api_errors_total.inc()
raise
async def place_bet_tracked(client, params: dict) -> dict:
async with track_api_call("casino-bet"):
result = await client.post("/casino/bet", json=params)
won = result["outcome"] == "win"
metrics.record_bet(won=won, payout=float(result["payout"]))
return result
Alerting Rules for Financial Agents
Alerts should be actionable and rare. If every alert requires human review, you'll start ignoring them. Define clear thresholds based on expected operating parameters and only fire alerts when action is genuinely needed.
Alert Severity Levels
Examples: balance < $10 USDC, error_rate > 50% for 5 minutes, position value > 10x expected
Examples: balance < $50 USDC, win rate < 30% over 200 bets, API p99 latency > 5s
Examples: daily bet count below baseline, rate limit warnings increasing
import asyncio
import aiohttp
import json
from dataclasses import dataclass
from enum import Enum
class Severity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
@dataclass
class Alert:
name: str
severity: Severity
message: str
value: float
threshold: float
agent_id: str
class AlertManager:
"""Evaluate alert rules and dispatch notifications."""
def __init__(self, webhook_url: str, agent_id: str):
self.webhook_url = webhook_url
self.agent_id = agent_id
self._fired: dict = {} # name → last_fired_ts
self._cooldown = 300 # 5 min between same alert
async def evaluate(self, m: AgentMetrics):
"""Evaluate all alert rules against current metrics."""
alerts = []
# CRITICAL: Balance below $10
if m.balance_usdc.value < 10.0:
alerts.append(Alert(
name="critical_low_balance",
severity=Severity.CRITICAL,
message=f"Balance critically low: {m.balance_usdc.value:.2f} USDC — agent will halt soon",
value=m.balance_usdc.value,
threshold=10.0,
agent_id=self.agent_id,
))
# WARNING: Balance below $50
elif m.balance_usdc.value < 50.0:
alerts.append(Alert(
name="low_balance",
severity=Severity.WARNING,
message=f"Balance low: {m.balance_usdc.value:.2f} USDC",
value=m.balance_usdc.value,
threshold=50.0,
agent_id=self.agent_id,
))
# WARNING: Win rate below 30% over 200+ bets
if m.bets_total.value >= 200 and m.win_rate() < 0.30:
alerts.append(Alert(
name="low_win_rate",
severity=Severity.WARNING,
message=f"Win rate abnormally low: {m.win_rate()*100:.1f}% (expected ~49%)",
value=m.win_rate(),
threshold=0.30,
agent_id=self.agent_id,
))
# CRITICAL: API error rate over 50% in last window
total = m.api_requests_total.value
errors = m.api_errors_total.value
if total > 10 and (errors / total) > 0.5:
alerts.append(Alert(
name="high_error_rate",
severity=Severity.CRITICAL,
message=f"API error rate: {errors/total*100:.0f}% ({int(errors)}/{int(total)} requests)",
value=errors / total,
threshold=0.5,
agent_id=self.agent_id,
))
# WARNING: High API latency
p99 = m.api_latency.percentile(99)
if p99 > 5.0:
alerts.append(Alert(
name="high_latency",
severity=Severity.WARNING,
message=f"API p99 latency degraded: {p99:.1f}s",
value=p99, threshold=5.0,
agent_id=self.agent_id,
))
# CRITICAL: Over-exposure
if m.position_value.value > m.balance_usdc.value * 5:
alerts.append(Alert(
name="over_exposure",
severity=Severity.CRITICAL,
message=f"Position value {m.position_value.value:.0f} USDC exceeds 5x balance",
value=m.position_value.value,
threshold=m.balance_usdc.value * 5,
agent_id=self.agent_id,
))
for alert in alerts:
await self._maybe_fire(alert)
async def _maybe_fire(self, alert: Alert):
now = time.time()
last = self._fired.get(alert.name, 0)
if now - last < self._cooldown:
return # In cooldown
self._fired[alert.name] = now
await self._dispatch(alert)
async def _dispatch(self, alert: Alert):
# Send to Slack/Discord/PagerDuty webhook
payload = {
"text": f"[{alert.severity.value.upper()}] {alert.name}\n{alert.message}",
"agent_id": alert.agent_id,
"severity": alert.severity.value,
}
try:
async with aiohttp.ClientSession() as s:
await s.post(self.webhook_url, json=payload, timeout=aiohttp.ClientTimeout(total=5))
except Exception as e:
logger.error(f"Failed to dispatch alert: {e}")
Dead Man's Switch: Alert When Agents Go Silent
The most dangerous failure mode for an autonomous agent is silent death — the process crashes, a logic bug causes it to stop making decisions, or the machine loses power. In all these cases, the agent is not actively doing anything wrong; it just stops. Without a dead man's switch, you may not notice for hours.
A dead man's switch works in reverse: the agent must periodically "check in" to a watchdog service. If the check-in stops, the watchdog fires an alert. The agent's silence itself becomes the signal.
import asyncio
import aiohttp
import time
import logging
logger = logging.getLogger(__name__)
class DeadMansSwitch:
"""
Heartbeat-based watchdog for autonomous agents.
The agent sends periodic check-ins. If check-ins stop,
an external monitor fires an alert.
Two components:
1. Agent-side: sends POST /heartbeat every N seconds
2. Monitor-side: checks timestamp; alerts if stale
"""
def __init__(
self,
watchdog_url: str,
agent_id: str,
interval: float = 60.0, # Check in every 60s
max_silence: float = 300.0, # Alert if silent 5min
):
self.watchdog_url = watchdog_url
self.agent_id = agent_id
self.interval = interval
self.max_silence = max_silence
self._running = False
async def start(self):
"""Start background heartbeat task."""
self._running = True
asyncio.create_task(self._heartbeat_loop())
async def stop(self):
self._running = False
# Send final "stopping" heartbeat
await self._ping(status="stopping")
async def _heartbeat_loop(self):
while self._running:
await self._ping(status="alive")
await asyncio.sleep(self.interval)
async def _ping(self, status: str = "alive"):
payload = {
"agent_id": self.agent_id,
"status": status,
"ts": time.time(),
"max_silence_seconds": self.max_silence,
}
try:
async with aiohttp.ClientSession() as s:
await s.post(
f"{self.watchdog_url}/heartbeat",
json=payload,
timeout=aiohttp.ClientTimeout(total=5),
)
except Exception as e:
# Heartbeat failure is logged but NEVER raises
# — never let watchdog issues crash the agent
logger.warning(f"Heartbeat failed: {e}")
# Self-hosted watchdog monitor (runs as a separate process)
class WatchdogMonitor:
"""
Standalone watchdog that fires alerts when agents go silent.
Deploy separately from the agent — defeats the purpose if co-located.
"""
def __init__(self, alert_webhook: str):
self.alert_webhook = alert_webhook
self._agents: dict = {} # agent_id → {ts, max_silence}
self._alerted: set = set()
async def record_heartbeat(self, agent_id: str, ts: float, max_silence: float):
self._agents[agent_id] = {"ts": ts, "max_silence": max_silence}
if agent_id in self._alerted:
self._alerted.discard(agent_id)
await self._send_alert(agent_id, "recovered", "Agent back online")
async def check_all(self):
now = time.time()
for agent_id, info in self._agents.items():
silence = now - info["ts"]
if silence > info["max_silence"] and agent_id not in self._alerted:
self._alerted.add(agent_id)
await self._send_alert(
agent_id, "silent",
f"Agent {agent_id} silent for {silence:.0f}s (threshold: {info['max_silence']}s)"
)
async def _send_alert(self, agent_id: str, event: str, msg: str):
payload = {
"text": f"[WATCHDOG] {event.upper()}: {msg}",
"agent_id": agent_id,
}
async with aiohttp.ClientSession() as s:
await s.post(self.alert_webhook, json=payload)
The watchdog monitor must run on a different machine than the agent. If they share a process or host, a host failure kills both simultaneously and the dead man's switch never fires. Use a cloud function, a separate VPS, or a service like Healthchecks.io for the watchdog component.
HTTP Health Endpoint
Expose a lightweight HTTP server from your agent to serve health status and Prometheus metrics. This integrates with standard infrastructure tooling: load balancers, Kubernetes liveness probes, uptime monitors like UptimeRobot.
from aiohttp import web
import json
async def handle_health(request: web.Request) -> web.Response:
"""Basic health check — returns 200 if agent is alive."""
m: AgentMetrics = request.app["metrics"]
silence = time.time() - m.last_heartbeat.value
status = "healthy" if silence < 120 else "degraded"
http_status = 200 if status == "healthy" else 503
body = {
"status": status,
"agent_id": m.agent_id,
"uptime_seconds": int(m.uptime_seconds.value),
"balance_usdc": m.balance_usdc.value,
"bets_total": int(m.bets_total.value),
"win_rate": round(m.win_rate(), 4),
"api_error_rate": round(
m.api_errors_total.value / max(m.api_requests_total.value, 1), 4
),
"last_activity_seconds_ago": int(silence),
}
return web.Response(
text=json.dumps(body, indent=2),
content_type="application/json",
status=http_status,
)
async def handle_metrics(request: web.Request) -> web.Response:
"""Prometheus-format metrics endpoint."""
m: AgentMetrics = request.app["metrics"]
return web.Response(
text=m.to_prometheus_text(),
content_type="text/plain; version=0.0.4",
)
async def start_health_server(m: AgentMetrics, port: int = 9090):
app = web.Application()
app["metrics"] = m
app.router.add_get("/health", handle_health)
app.router.add_get("/metrics", handle_metrics)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", port)
await site.start()
logger.info(f"Health server running on :{port}")
return runner
Complete Monitoring System
Wire everything together in a single startup sequence. The agent, health server, alert manager, and dead man's switch all start concurrently:
async def main():
agent_id = os.environ.get("AGENT_ID", "casino-agent-001")
# Initialize monitoring infrastructure
m = AgentMetrics(agent_id=agent_id)
alert_mgr = AlertManager(
webhook_url=os.environ["ALERT_WEBHOOK_URL"],
agent_id=agent_id,
)
dms = DeadMansSwitch(
watchdog_url=os.environ["WATCHDOG_URL"],
agent_id=agent_id,
interval=60,
max_silence=300,
)
# Start background services
health_runner = await start_health_server(m, port=9090)
await dms.start()
# Alert evaluation loop (runs every 30s)
async def alert_loop():
while True:
await alert_mgr.evaluate(m)
await asyncio.sleep(30)
asyncio.create_task(alert_loop())
# Main agent loop
async with PurpleFleatClient() as client:
while True:
try:
# Update balance metric
bal = await client.get("/wallet/balance")
m.balance_usdc.set(float(bal["usdc"]))
# Place a bet and record it
result = await place_bet_tracked(client, {
"game": "dice", "amount": "5.00",
"prediction": "over", "target": 50,
})
# Heartbeat so dead man's switch stays armed
m.heartbeat()
except Exception as e:
logger.error(f"Agent loop error: {e}", exc_info=True)
m.api_errors_total.inc()
await asyncio.sleep(10)
if __name__ == "__main__":
asyncio.run(main())
| Component | Port/Endpoint | Purpose |
|---|---|---|
| Health server | :9090/health |
Kubernetes probes, uptime monitors |
| Metrics endpoint | :9090/metrics |
Prometheus scraping |
| Alert manager | Webhook outbound | Slack/PagerDuty/Discord alerts |
| Dead man's switch | Watchdog POST outbound | Alert on agent silence |
| DLQ | SQLite on disk | Capture permanent failures |
Monitor Your Agent in Production
Start with the free USDC faucet, build your monitoring stack, and run your agent against live Purple Flea APIs with confidence.