Operations

Monitoring and Alerting for AI Agents:
Know When Things Go Wrong

March 4, 2026
28 min read
Purple Flea Engineering

A well-built agent can run for days without human supervision — but only if you have visibility into what it is doing. This guide covers the complete monitoring stack: health checks, Prometheus-style metrics, alert rules for financial events (low balance, liquidation risk, unusual behavior), and dead man's switches that alert when your agent silently stops running.

Table of Contents
  1. 01What to Monitor in a Financial Agent
  2. 02Metrics Collection with Prometheus-Style Counters
  3. 03Alerting Rules for Financial Agents
  4. 04Dead Man's Switch: Alert When Agents Go Silent
  5. 05HTTP Health Endpoint
  6. 06Complete Monitoring System
01

What to Monitor in a Financial Agent

Most monitoring guides focus on infrastructure: CPU, memory, uptime. For an AI agent operating financial APIs, those signals matter less than the behavioral signals: is the agent making decisions within expected parameters? Is it losing money faster than expected? Has it stopped trading entirely?

Financial agents need three tiers of monitoring: infrastructure health (is the process running?), API health (are requests succeeding?), and financial health (is the agent behaving as intended?).

Balance
gauge
Current USDC balance across all Purple Flea wallets. Alert on low balance.
Bets Placed
counter
Total casino bets per session. Alert if bets per minute drops to zero.
Win Rate
gauge
Rolling 100-bet win percentage. Alert on abnormal deviation.
API Errors
counter
HTTP errors by status code. Alert on elevated 5xx rates.
P99 Latency
histogram
99th percentile API response time. Alert on degraded latency.
Open Positions
gauge
Count and total value of open trading positions. Alert on over-exposure.
Escrow State
gauge
Count of escrows by state (pending, funded, released). Alert on stale escrows.
Last Heartbeat
gauge
Unix timestamp of last successful action. Feed into dead man's switch.
💡
Monitor Behavior, Not Just Health

A process can be "healthy" (running, responding to health checks) but behaving incorrectly — betting too aggressively, holding positions too long, or making no decisions at all due to a logic bug. Behavioral metrics are more valuable than infrastructure metrics for autonomous agents.

02

Metrics Collection with Prometheus-Style Counters

You do not need a full Prometheus deployment to get value from structured metrics. A lightweight in-process metrics registry gives you the same semantics (counters, gauges, histograms) without the infrastructure overhead. The data can be exported to Prometheus, StatsD, or a simple HTTP endpoint for scraping.

Python metrics.py
import time
import threading
from collections import defaultdict
from typing import Dict, List, Optional
from dataclasses import dataclass, field

@dataclass
class Counter:
    """Monotonically increasing counter."""
    name: str
    help_text: str
    labels: Dict[str, str] = field(default_factory=dict)
    _value: float = 0.0
    _lock: threading.Lock = field(default_factory=threading.Lock)

    def inc(self, amount: float = 1.0):
        with self._lock:
            self._value += amount

    @property
    def value(self) -> float:
        return self._value

@dataclass
class Gauge:
    """Value that can go up or down."""
    name: str
    help_text: str
    labels: Dict[str, str] = field(default_factory=dict)
    _value: float = 0.0
    _lock: threading.Lock = field(default_factory=threading.Lock)

    def set(self, value: float):
        with self._lock:
            self._value = value

    def inc(self, amount: float = 1.0):
        with self._lock:
            self._value += amount

    def dec(self, amount: float = 1.0):
        with self._lock:
            self._value -= amount

    @property
    def value(self) -> float:
        return self._value

class Histogram:
    """Track distribution of values (latency, amounts)."""
    DEFAULT_BUCKETS = [0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]

    def __init__(self, name: str, help_text: str, buckets: List[float] = None):
        self.name = name
        self.help_text = help_text
        self.buckets = sorted(buckets or self.DEFAULT_BUCKETS)
        self._counts = defaultdict(int)
        self._sum = 0.0
        self._count = 0
        self._lock = threading.Lock()

    def observe(self, value: float):
        with self._lock:
            self._sum += value
            self._count += 1
            for bucket in self.buckets:
                if value <= bucket:
                    self._counts[bucket] += 1

    def percentile(self, p: float) -> float:
        """Estimate percentile from bucket counts."""
        if self._count == 0:
            return 0.0
        target = self._count * (p / 100)
        for bucket in self.buckets:
            if self._counts[bucket] >= target:
                return bucket
        return self.buckets[-1]

class AgentMetrics:
    """Central metrics registry for a Purple Flea agent."""

    def __init__(self, agent_id: str):
        self.agent_id = agent_id

        # Financial metrics
        self.balance_usdc = Gauge("agent_balance_usdc", "Current USDC wallet balance")
        self.open_positions = Gauge("agent_open_positions", "Number of open trading positions")
        self.position_value = Gauge("agent_position_value_usdc", "Total value of open positions")

        # Activity counters
        self.bets_total = Counter("agent_bets_total", "Total casino bets placed")
        self.bets_won = Counter("agent_bets_won", "Total casino bets won")
        self.bets_lost = Counter("agent_bets_lost", "Total casino bets lost")
        self.trades_total = Counter("agent_trades_total", "Total trades executed")
        self.escrows_created = Counter("agent_escrows_created", "Total escrows created")
        self.escrows_released = Counter("agent_escrows_released", "Total escrows released")

        # API health
        self.api_requests_total = Counter("agent_api_requests_total", "Total API requests")
        self.api_errors_total = Counter("agent_api_errors_total", "Total API errors")
        self.api_latency = Histogram("agent_api_latency_seconds", "API request latency")

        # Lifecycle
        self.last_heartbeat = Gauge("agent_last_heartbeat_ts", "Unix timestamp of last activity")
        self.uptime_seconds = Gauge("agent_uptime_seconds", "Agent uptime")
        self._start_time = time.time()

    def heartbeat(self):
        """Record that the agent is alive and active."""
        now = time.time()
        self.last_heartbeat.set(now)
        self.uptime_seconds.set(now - self._start_time)

    def record_bet(self, won: bool, payout: float):
        self.bets_total.inc()
        self.heartbeat()
        if won:
            self.bets_won.inc()
        else:
            self.bets_lost.inc()

    def win_rate(self) -> float:
        total = self.bets_total.value
        if total == 0:
            return 0.0
        return self.bets_won.value / total

    def to_prometheus_text(self) -> str:
        """Export in Prometheus text exposition format."""
        lines = []
        for name, metric in vars(self).items():
            if isinstance(metric, (Counter, Gauge)):
                lines.append(f"# HELP {metric.name} {metric.help_text}")
                lines.append(f"# TYPE {metric.name} {'counter' if isinstance(metric, Counter) else 'gauge'}")
                lines.append(f'{metric.name}{{agent_id="{self.agent_id}"}} {metric.value}')
        return "\n".join(lines) + "\n"

# Global metrics instance
metrics = AgentMetrics(agent_id=os.environ.get("AGENT_ID", "default"))

Instrumenting Purple Flea API Calls

Wrap all API calls with automatic metrics recording using a context manager or decorator:

Python instrumented_client.py
from contextlib import asynccontextmanager

@asynccontextmanager
async def track_api_call(operation: str):
    """Context manager that records latency and errors for any API call."""
    start = time.monotonic()
    metrics.api_requests_total.inc()
    try:
        yield
        elapsed = time.monotonic() - start
        metrics.api_latency.observe(elapsed)
    except httpx.HTTPStatusError as e:
        metrics.api_errors_total.inc()
        elapsed = time.monotonic() - start
        metrics.api_latency.observe(elapsed)
        raise
    except Exception:
        metrics.api_errors_total.inc()
        raise

async def place_bet_tracked(client, params: dict) -> dict:
    async with track_api_call("casino-bet"):
        result = await client.post("/casino/bet", json=params)

    won = result["outcome"] == "win"
    metrics.record_bet(won=won, payout=float(result["payout"]))
    return result
03

Alerting Rules for Financial Agents

Alerts should be actionable and rare. If every alert requires human review, you'll start ignoring them. Define clear thresholds based on expected operating parameters and only fire alerts when action is genuinely needed.

Alert Severity Levels

CRITICAL
Immediate action required — agent may be losing money

Examples: balance < $10 USDC, error_rate > 50% for 5 minutes, position value > 10x expected

WARNING
Investigate soon — abnormal behavior detected

Examples: balance < $50 USDC, win rate < 30% over 200 bets, API p99 latency > 5s

INFO
For awareness — monitor but no immediate action needed

Examples: daily bet count below baseline, rate limit warnings increasing

Python alerting.py
import asyncio
import aiohttp
import json
from dataclasses import dataclass
from enum import Enum

class Severity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

@dataclass
class Alert:
    name: str
    severity: Severity
    message: str
    value: float
    threshold: float
    agent_id: str

class AlertManager:
    """Evaluate alert rules and dispatch notifications."""

    def __init__(self, webhook_url: str, agent_id: str):
        self.webhook_url = webhook_url
        self.agent_id = agent_id
        self._fired: dict = {}  # name → last_fired_ts
        self._cooldown = 300   # 5 min between same alert

    async def evaluate(self, m: AgentMetrics):
        """Evaluate all alert rules against current metrics."""
        alerts = []

        # CRITICAL: Balance below $10
        if m.balance_usdc.value < 10.0:
            alerts.append(Alert(
                name="critical_low_balance",
                severity=Severity.CRITICAL,
                message=f"Balance critically low: {m.balance_usdc.value:.2f} USDC — agent will halt soon",
                value=m.balance_usdc.value,
                threshold=10.0,
                agent_id=self.agent_id,
            ))

        # WARNING: Balance below $50
        elif m.balance_usdc.value < 50.0:
            alerts.append(Alert(
                name="low_balance",
                severity=Severity.WARNING,
                message=f"Balance low: {m.balance_usdc.value:.2f} USDC",
                value=m.balance_usdc.value,
                threshold=50.0,
                agent_id=self.agent_id,
            ))

        # WARNING: Win rate below 30% over 200+ bets
        if m.bets_total.value >= 200 and m.win_rate() < 0.30:
            alerts.append(Alert(
                name="low_win_rate",
                severity=Severity.WARNING,
                message=f"Win rate abnormally low: {m.win_rate()*100:.1f}% (expected ~49%)",
                value=m.win_rate(),
                threshold=0.30,
                agent_id=self.agent_id,
            ))

        # CRITICAL: API error rate over 50% in last window
        total = m.api_requests_total.value
        errors = m.api_errors_total.value
        if total > 10 and (errors / total) > 0.5:
            alerts.append(Alert(
                name="high_error_rate",
                severity=Severity.CRITICAL,
                message=f"API error rate: {errors/total*100:.0f}% ({int(errors)}/{int(total)} requests)",
                value=errors / total,
                threshold=0.5,
                agent_id=self.agent_id,
            ))

        # WARNING: High API latency
        p99 = m.api_latency.percentile(99)
        if p99 > 5.0:
            alerts.append(Alert(
                name="high_latency",
                severity=Severity.WARNING,
                message=f"API p99 latency degraded: {p99:.1f}s",
                value=p99, threshold=5.0,
                agent_id=self.agent_id,
            ))

        # CRITICAL: Over-exposure
        if m.position_value.value > m.balance_usdc.value * 5:
            alerts.append(Alert(
                name="over_exposure",
                severity=Severity.CRITICAL,
                message=f"Position value {m.position_value.value:.0f} USDC exceeds 5x balance",
                value=m.position_value.value,
                threshold=m.balance_usdc.value * 5,
                agent_id=self.agent_id,
            ))

        for alert in alerts:
            await self._maybe_fire(alert)

    async def _maybe_fire(self, alert: Alert):
        now = time.time()
        last = self._fired.get(alert.name, 0)
        if now - last < self._cooldown:
            return  # In cooldown
        self._fired[alert.name] = now
        await self._dispatch(alert)

    async def _dispatch(self, alert: Alert):
        # Send to Slack/Discord/PagerDuty webhook
        payload = {
            "text": f"[{alert.severity.value.upper()}] {alert.name}\n{alert.message}",
            "agent_id": alert.agent_id,
            "severity": alert.severity.value,
        }
        try:
            async with aiohttp.ClientSession() as s:
                await s.post(self.webhook_url, json=payload, timeout=aiohttp.ClientTimeout(total=5))
        except Exception as e:
            logger.error(f"Failed to dispatch alert: {e}")
04

Dead Man's Switch: Alert When Agents Go Silent

The most dangerous failure mode for an autonomous agent is silent death — the process crashes, a logic bug causes it to stop making decisions, or the machine loses power. In all these cases, the agent is not actively doing anything wrong; it just stops. Without a dead man's switch, you may not notice for hours.

A dead man's switch works in reverse: the agent must periodically "check in" to a watchdog service. If the check-in stops, the watchdog fires an alert. The agent's silence itself becomes the signal.

Python dead_mans_switch.py
import asyncio
import aiohttp
import time
import logging

logger = logging.getLogger(__name__)

class DeadMansSwitch:
    """
    Heartbeat-based watchdog for autonomous agents.

    The agent sends periodic check-ins. If check-ins stop,
    an external monitor fires an alert.

    Two components:
    1. Agent-side: sends POST /heartbeat every N seconds
    2. Monitor-side: checks timestamp; alerts if stale
    """

    def __init__(
        self,
        watchdog_url: str,
        agent_id: str,
        interval: float = 60.0,  # Check in every 60s
        max_silence: float = 300.0,  # Alert if silent 5min
    ):
        self.watchdog_url = watchdog_url
        self.agent_id = agent_id
        self.interval = interval
        self.max_silence = max_silence
        self._running = False

    async def start(self):
        """Start background heartbeat task."""
        self._running = True
        asyncio.create_task(self._heartbeat_loop())

    async def stop(self):
        self._running = False
        # Send final "stopping" heartbeat
        await self._ping(status="stopping")

    async def _heartbeat_loop(self):
        while self._running:
            await self._ping(status="alive")
            await asyncio.sleep(self.interval)

    async def _ping(self, status: str = "alive"):
        payload = {
            "agent_id": self.agent_id,
            "status": status,
            "ts": time.time(),
            "max_silence_seconds": self.max_silence,
        }
        try:
            async with aiohttp.ClientSession() as s:
                await s.post(
                    f"{self.watchdog_url}/heartbeat",
                    json=payload,
                    timeout=aiohttp.ClientTimeout(total=5),
                )
        except Exception as e:
            # Heartbeat failure is logged but NEVER raises
            # — never let watchdog issues crash the agent
            logger.warning(f"Heartbeat failed: {e}")

# Self-hosted watchdog monitor (runs as a separate process)
class WatchdogMonitor:
    """
    Standalone watchdog that fires alerts when agents go silent.
    Deploy separately from the agent — defeats the purpose if co-located.
    """

    def __init__(self, alert_webhook: str):
        self.alert_webhook = alert_webhook
        self._agents: dict = {}  # agent_id → {ts, max_silence}
        self._alerted: set = set()

    async def record_heartbeat(self, agent_id: str, ts: float, max_silence: float):
        self._agents[agent_id] = {"ts": ts, "max_silence": max_silence}
        if agent_id in self._alerted:
            self._alerted.discard(agent_id)
            await self._send_alert(agent_id, "recovered", "Agent back online")

    async def check_all(self):
        now = time.time()
        for agent_id, info in self._agents.items():
            silence = now - info["ts"]
            if silence > info["max_silence"] and agent_id not in self._alerted:
                self._alerted.add(agent_id)
                await self._send_alert(
                    agent_id, "silent",
                    f"Agent {agent_id} silent for {silence:.0f}s (threshold: {info['max_silence']}s)"
                )

    async def _send_alert(self, agent_id: str, event: str, msg: str):
        payload = {
            "text": f"[WATCHDOG] {event.upper()}: {msg}",
            "agent_id": agent_id,
        }
        async with aiohttp.ClientSession() as s:
            await s.post(self.alert_webhook, json=payload)
Deploy Watchdog Separately

The watchdog monitor must run on a different machine than the agent. If they share a process or host, a host failure kills both simultaneously and the dead man's switch never fires. Use a cloud function, a separate VPS, or a service like Healthchecks.io for the watchdog component.

05

HTTP Health Endpoint

Expose a lightweight HTTP server from your agent to serve health status and Prometheus metrics. This integrates with standard infrastructure tooling: load balancers, Kubernetes liveness probes, uptime monitors like UptimeRobot.

Python health_server.py
from aiohttp import web
import json

async def handle_health(request: web.Request) -> web.Response:
    """Basic health check — returns 200 if agent is alive."""
    m: AgentMetrics = request.app["metrics"]
    silence = time.time() - m.last_heartbeat.value

    status = "healthy" if silence < 120 else "degraded"
    http_status = 200 if status == "healthy" else 503

    body = {
        "status": status,
        "agent_id": m.agent_id,
        "uptime_seconds": int(m.uptime_seconds.value),
        "balance_usdc": m.balance_usdc.value,
        "bets_total": int(m.bets_total.value),
        "win_rate": round(m.win_rate(), 4),
        "api_error_rate": round(
            m.api_errors_total.value / max(m.api_requests_total.value, 1), 4
        ),
        "last_activity_seconds_ago": int(silence),
    }
    return web.Response(
        text=json.dumps(body, indent=2),
        content_type="application/json",
        status=http_status,
    )

async def handle_metrics(request: web.Request) -> web.Response:
    """Prometheus-format metrics endpoint."""
    m: AgentMetrics = request.app["metrics"]
    return web.Response(
        text=m.to_prometheus_text(),
        content_type="text/plain; version=0.0.4",
    )

async def start_health_server(m: AgentMetrics, port: int = 9090):
    app = web.Application()
    app["metrics"] = m
    app.router.add_get("/health", handle_health)
    app.router.add_get("/metrics", handle_metrics)

    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, "0.0.0.0", port)
    await site.start()
    logger.info(f"Health server running on :{port}")
    return runner
06

Complete Monitoring System

Wire everything together in a single startup sequence. The agent, health server, alert manager, and dead man's switch all start concurrently:

Python main.py
async def main():
    agent_id = os.environ.get("AGENT_ID", "casino-agent-001")

    # Initialize monitoring infrastructure
    m = AgentMetrics(agent_id=agent_id)
    alert_mgr = AlertManager(
        webhook_url=os.environ["ALERT_WEBHOOK_URL"],
        agent_id=agent_id,
    )
    dms = DeadMansSwitch(
        watchdog_url=os.environ["WATCHDOG_URL"],
        agent_id=agent_id,
        interval=60,
        max_silence=300,
    )

    # Start background services
    health_runner = await start_health_server(m, port=9090)
    await dms.start()

    # Alert evaluation loop (runs every 30s)
    async def alert_loop():
        while True:
            await alert_mgr.evaluate(m)
            await asyncio.sleep(30)

    asyncio.create_task(alert_loop())

    # Main agent loop
    async with PurpleFleatClient() as client:
        while True:
            try:
                # Update balance metric
                bal = await client.get("/wallet/balance")
                m.balance_usdc.set(float(bal["usdc"]))

                # Place a bet and record it
                result = await place_bet_tracked(client, {
                    "game": "dice", "amount": "5.00",
                    "prediction": "over", "target": 50,
                })

                # Heartbeat so dead man's switch stays armed
                m.heartbeat()

            except Exception as e:
                logger.error(f"Agent loop error: {e}", exc_info=True)
                m.api_errors_total.inc()

            await asyncio.sleep(10)

if __name__ == "__main__":
    asyncio.run(main())
Component Port/Endpoint Purpose
Health server :9090/health Kubernetes probes, uptime monitors
Metrics endpoint :9090/metrics Prometheus scraping
Alert manager Webhook outbound Slack/PagerDuty/Discord alerts
Dead man's switch Watchdog POST outbound Alert on agent silence
DLQ SQLite on disk Capture permanent failures

Monitor Your Agent in Production

Start with the free USDC faucet, build your monitoring stack, and run your agent against live Purple Flea APIs with confidence.