Agent Observability: Monitoring AI Agents in Production
Silent failures are the most expensive kind. When a human trader makes a mistake, they notice — the P&L report turns red, the phone rings, someone asks a question. When an AI agent fails silently, it might keep executing trades, paying fees, and burning funds for hours before anything surfaces.
Observability is the practice of making your agent's internal state externally visible at all times. This guide covers the three pillars — logs, metrics, and traces — applied specifically to AI agents running financial operations on infrastructure like Purple Flea.
1. Why Observability Matters for AI Agents
Traditional software observability is hard. AI agent observability is harder because agent behavior is non-deterministic — the same inputs can produce different outputs depending on model state, context window content, and tool call ordering. Standard application monitoring assumes predictable execution paths. Agents don't have those.
The financial stakes make this worse. An unmonitored web server might serve a few 500 errors. An unmonitored trading agent might execute thousands of losing positions, drain a wallet, or trigger fraud detection on a third-party service.
What Can Go Wrong Without Observability
- Balance bleed: Fees accumulate faster than wins; no one notices for hours
- Stuck loops: Agent retries the same failing action indefinitely
- Model drift: LLM starts hallucinating tool parameters after context overflow
- Key expiry: Credentials rotate; agent keeps operating with stale auth
- Rate limit spirals: Hitting rate limits triggers backoff, then burst, then 429 flood
- Dependency failure: Upstream service down; agent doesn't know, keeps trying
- Referral loss: Referral code dropped from requests silently loses revenue
Each of these is preventable with proper observability. The cost of instrumentation is measured in hours of engineering. The cost of not instrumenting is measured in lost funds.
2. The 3 Pillars: Logs, Metrics, and Traces for AI Agents
Logs
Structured event records. Every API call, LLM invocation, tool result, and error. The raw narrative of what happened and when.
Metrics
Aggregated numerical measurements over time. Request rates, error percentages, P&L delta, latency percentiles. The numbers that tell you how the system is performing.
Traces
Causal chains across services. From the initial LLM decision through tool calls, API requests, and downstream effects. The "why" behind the numbers.
These three are not alternatives — they are complementary. Metrics tell you something is wrong. Logs tell you what happened. Traces tell you why it happened across the full call chain.
For AI agents specifically, you need a fourth concept: decision logs. These capture the LLM reasoning — which tools were called, in what order, with what parameters, and what the model "said" it was trying to accomplish. Decision logs are unique to AI systems and have no equivalent in traditional observability.
3. Structured Logging in Python
Unstructured logs are nearly useless at scale. print("Error making API call") tells you nothing when you're searching through 10,000 log lines at 2am. Structured logging outputs machine-parseable JSON that can be indexed, queried, and alerted on.
import logging
import json
import time
import traceback
from datetime import datetime, timezone
from functools import wraps
from typing import Any, Optional
class StructuredFormatter(logging.Formatter):
"""JSON formatter for agent logs."""
def format(self, record: logging.LogRecord) -> str:
log_data = {
"ts": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"msg": record.getMessage(),
"agent_id": getattr(record, "agent_id", None),
"session_id": getattr(record, "session_id", None),
}
# Merge any extra fields
for key, val in record.__dict__.items():
if key not in ("msg", "args", "levelname", "name", "pathname",
"filename", "module", "exc_info", "exc_text",
"stack_info", "lineno", "funcName", "created",
"msecs", "relativeCreated", "thread", "threadName",
"processName", "process", "message"):
if not key.startswith("_"):
log_data[key] = val
if record.exc_info:
log_data["exception"] = traceback.format_exception(*record.exc_info)
return json.dumps(log_data)
def get_logger(name: str, agent_id: str = None) -> logging.Logger:
logger = logging.getLogger(name)
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(StructuredFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logging.LoggerAdapter(logger, {"agent_id": agent_id})
# Usage
log = get_logger("casino-agent", agent_id="agent-abc123")
import httpx
import time
async def tracked_api_call(
client: httpx.AsyncClient,
method: str,
url: str,
session_id: str,
balance_before: float,
**kwargs
) -> dict:
"""Wrap every API call with structured logging."""
start = time.perf_counter()
status = None
error = None
result = None
try:
resp = await client.request(method, url, **kwargs)
status = resp.status_code
result = resp.json() if resp.headers.get("content-type","").startswith("application/json") else {}
resp.raise_for_status()
return result
except httpx.HTTPStatusError as e:
status = e.response.status_code
error = str(e)
raise
except Exception as e:
error = str(e)
raise
finally:
duration_ms = (time.perf_counter() - start) * 1000
# Extract balance from result if present
balance_after = result.get("balance") if result else None
pnl_delta = None
if balance_after is not None and balance_before is not None:
pnl_delta = round(balance_after - balance_before, 6)
log.info(
"api_call",
extra={
"event": "api_call",
"session_id": session_id,
"method": method.upper(),
"url": url,
"status": status,
"duration_ms": round(duration_ms, 2),
"balance_before": balance_before,
"balance_after": balance_after,
"pnl_delta": pnl_delta,
"error": error,
}
)
# Usage example
result = await tracked_api_call(
client=client,
method="POST",
url="https://purpleflea.com/api/casino/play",
session_id="session-xyz",
balance_before=current_balance,
json={"game": "dice", "amount": 0.10, "api_key": "pf_live_abc123"}
)
def log_decision(
action: str,
reasoning: str,
tool: str,
params: dict,
expected_outcome: str,
session_id: str
):
log.info(
"agent_decision",
extra={
"event": "decision",
"session_id": session_id,
"action": action,
"reasoning": reasoning,
"tool": tool,
"params": {k: v for k, v in params.items() if k != "api_key"}, # never log keys
"expected_outcome": expected_outcome,
"timestamp": time.time(),
}
)
# Example call
log_decision(
action="place_bet",
reasoning="Balance is above minimum threshold, variance within limits",
tool="casino_play",
params={"game": "dice", "amount": 0.05, "target": 2.0},
expected_outcome="Win 0.10 at 50% probability",
session_id=session_id
)
4. Key Metrics to Track
Metrics are the quantitative heartbeat of your agent. Unlike logs (which capture every event), metrics are aggregated over time windows and answer questions like "what is happening right now" and "is this better or worse than yesterday".
import time
import threading
from collections import deque, defaultdict
from dataclasses import dataclass, field
from typing import Dict, Deque
@dataclass
class MetricsCollector:
"""Lightweight in-process metrics collector."""
window_seconds: int = 60
_lock: threading.Lock = field(default_factory=threading.Lock)
_requests: Deque[tuple] = field(default_factory=lambda: deque(maxlen=10000))
_errors: Deque[float] = field(default_factory=lambda: deque(maxlen=10000))
_latencies: Deque[tuple] = field(default_factory=lambda: deque(maxlen=10000))
_pnl_events: Deque[tuple] = field(default_factory=lambda: deque(maxlen=10000))
_balance_history: Deque[tuple] = field(default_factory=lambda: deque(maxlen=1000))
_referral_clicks: int = 0
def record_request(self, service: str, success: bool, latency_ms: float, pnl_delta: float = 0.0):
now = time.time()
with self._lock:
self._requests.append((now, service, success))
self._latencies.append((now, service, latency_ms))
if not success:
self._errors.append(now)
if pnl_delta != 0.0:
self._pnl_events.append((now, pnl_delta))
def record_balance(self, balance: float):
with self._lock:
self._balance_history.append((time.time(), balance))
def record_referral_click(self):
with self._lock:
self._referral_clicks += 1
def snapshot(self) -> Dict:
now = time.time()
cutoff = now - self.window_seconds
with self._lock:
recent_req = [(t, s, ok) for t, s, ok in self._requests if t > cutoff]
recent_lat = [ms for t, s, ms in self._latencies if t > cutoff]
recent_err = [t for t in self._errors if t > cutoff]
recent_pnl = sum(d for t, d in self._pnl_events if t > cutoff)
req_per_min = len(recent_req) / (self.window_seconds / 60)
error_rate = (len(recent_err) / len(recent_req) * 100) if recent_req else 0.0
p50 = sorted(recent_lat)[len(recent_lat)//2] if recent_lat else 0.0
p95 = sorted(recent_lat)[int(len(recent_lat)*0.95)] if recent_lat else 0.0
# Balance delta over last hour
hour_ago = now - 3600
old_bal = next((b for t, b in self._balance_history if t > hour_ago), None)
latest_bal = self._balance_history[-1][1] if self._balance_history else None
balance_delta_pct = None
if old_bal and latest_bal and old_bal > 0:
balance_delta_pct = round((latest_bal - old_bal) / old_bal * 100, 2)
return {
"window_seconds": self.window_seconds,
"requests_per_min": round(req_per_min, 1),
"error_rate_pct": round(error_rate, 2),
"latency_p50_ms": round(p50, 1),
"latency_p95_ms": round(p95, 1),
"pnl_delta_window": round(recent_pnl, 6),
"balance_delta_pct_1h": balance_delta_pct,
"referral_clicks_total": self._referral_clicks,
}
metrics = MetricsCollector(window_seconds=60)
5. Alerting — Know Before It Hurts
Metrics without alerting are just dashboards. Dashboards require someone to be watching. Alerts fire when no one is watching — which is exactly when things go wrong.
For AI agents running financial operations, you want alerts on three categories: financial risk, operational health, and anomaly detection.
Alert Thresholds
| Metric | Warning Threshold | Critical Threshold | Action |
|---|---|---|---|
| Balance drop (rolling 1h) | >5% | >10% | Pause operations |
| Error rate | >2% | >5% | Investigate + retry backoff |
| API latency p95 | >2s | >10s | Circuit breaker open |
| Zero requests (silent) | 5 min no activity | 15 min no activity | Health check + restart |
| Consecutive losses | 5 in a row | 10 in a row | Reduce bet size |
| LLM context overflow | 80% context used | 95% context used | Summarize + truncate |
import httpx
import time
from typing import Dict
class AlertManager:
"""Sends webhook alerts with cooldown to avoid spam."""
def __init__(self, webhook_url: str, cooldown_seconds: int = 300):
self.webhook_url = webhook_url
self.cooldown = cooldown_seconds
self._last_alert: Dict[str, float] = {}
async def fire(self, alert_name: str, severity: str, message: str, data: dict = None):
now = time.time()
last = self._last_alert.get(alert_name, 0)
if now - last < self.cooldown:
return # Cooldown active — suppress duplicate
self._last_alert[alert_name] = now
payload = {
"alert": alert_name,
"severity": severity, # "warning" | "critical"
"message": message,
"data": data or {},
"timestamp": now,
"agent_id": AGENT_ID,
}
log.warning("alert_fired", extra={"alert": alert_name, "severity": severity})
async with httpx.AsyncClient() as client:
try:
await client.post(self.webhook_url, json=payload, timeout=5.0)
except Exception as e:
log.error("alert_delivery_failed", extra={"error": str(e)})
async def check_and_alert(metrics_snapshot: dict, alerter: AlertManager):
snap = metrics_snapshot
# Balance drop alert
if snap.get("balance_delta_pct_1h") is not None:
delta = snap["balance_delta_pct_1h"]
if delta < -10:
await alerter.fire(
"balance_drop_critical", "critical",
f"Balance dropped {abs(delta):.1f}% in last hour — pausing",
data={"delta_pct": delta}
)
elif delta < -5:
await alerter.fire(
"balance_drop_warning", "warning",
f"Balance dropped {abs(delta):.1f}% in last hour",
data={"delta_pct": delta}
)
# Error rate alert
if snap["error_rate_pct"] > 5:
await alerter.fire(
"high_error_rate", "critical",
f"Error rate {snap['error_rate_pct']:.1f}% exceeds 5% threshold",
data={"error_rate": snap["error_rate_pct"]}
)
alerter = AlertManager(webhook_url="https://hooks.example.com/agent-alerts")
6. Health Check Endpoint Pattern
Every production agent should expose a /health endpoint that returns its current operational status. This enables external monitoring (uptime services, orchestrators, other agents) to verify liveness without needing access to internal logs.
from fastapi import FastAPI
from enum import Enum
import time
app = FastAPI()
class HealthStatus(str, Enum):
OK = "ok"
DEGRADED = "degraded"
CRITICAL = "critical"
OFFLINE = "offline"
@app.get("/health")
async def health_check():
snap = metrics.snapshot()
# Determine status
status = HealthStatus.OK
issues = []
if snap["error_rate_pct"] > 5:
status = HealthStatus.CRITICAL
issues.append(f"error_rate={snap['error_rate_pct']:.1f}%")
elif snap["error_rate_pct"] > 2:
status = HealthStatus.DEGRADED
issues.append(f"elevated_errors={snap['error_rate_pct']:.1f}%")
if snap.get("balance_delta_pct_1h") and snap["balance_delta_pct_1h"] < -10:
status = HealthStatus.CRITICAL
issues.append(f"balance_drop={snap['balance_delta_pct_1h']:.1f}%")
return {
"status": status,
"agent_id": AGENT_ID,
"uptime_seconds": int(time.time() - START_TIME),
"metrics": snap,
"issues": issues,
"version": "1.0.0",
"checked_at": time.time(),
}
@app.get("/health/live")
async def liveness():
"""Kubernetes liveness probe — just confirm process is alive."""
return {"alive": True}
@app.get("/health/ready")
async def readiness():
"""Kubernetes readiness probe — confirm agent is ready to process."""
snap = metrics.snapshot()
ready = snap["error_rate_pct"] < 10
return {"ready": ready, "error_rate": snap["error_rate_pct"]}
7. Distributed Tracing for Multi-Agent Systems
When Agent A asks Agent B to execute a trade on its behalf via escrow, and Agent B calls Purple Flea's casino API, and that API call fails — which agent's logs do you check first? With distributed tracing, every request carries a trace ID that links all related operations across all agents into a single causal chain.
import uuid
import contextvars
# Thread-local trace context
_trace_id: contextvars.ContextVar[str] = contextvars.ContextVar("trace_id", default="")
_span_id: contextvars.ContextVar[str] = contextvars.ContextVar("span_id", default="")
def start_trace() -> str:
trace_id = str(uuid.uuid4()).replace("-", "")
_trace_id.set(trace_id)
_span_id.set(str(uuid.uuid4()).replace("-", "")[:16])
return trace_id
def get_trace_headers() -> dict:
"""Headers to propagate to downstream services."""
return {
"X-Trace-Id": _trace_id.get() or start_trace(),
"X-Span-Id": _span_id.get(),
"X-Agent-Id": AGENT_ID,
}
def extract_trace_from_headers(headers: dict):
"""Extract and set trace context from incoming request."""
trace_id = headers.get("X-Trace-Id") or headers.get("x-trace-id")
span_id = headers.get("X-Span-Id") or headers.get("x-span-id")
if trace_id:
_trace_id.set(trace_id)
if span_id:
_span_id.set(span_id)
return trace_id
# Every outbound request includes trace headers
async def call_downstream_agent(url: str, payload: dict) -> dict:
async with httpx.AsyncClient() as client:
resp = await client.post(
url,
json=payload,
headers={
**get_trace_headers(),
"Authorization": f"Bearer {AGENT_API_KEY}",
}
)
return resp.json()
# Every log entry includes trace ID automatically
log.info("escrow_initiated", extra={
"trace_id": _trace_id.get(),
"span_id": _span_id.get(),
"event": "escrow_initiated",
"amount": 5.00,
"counterparty": "agent-def456",
})
8. Dashboard Example — ASCII Visualization
Full observability stacks (Grafana, Datadog, New Relic) are excellent for production. But sometimes you need a quick terminal view during development or debugging. Here's a minimal ASCII dashboard that renders in any terminal.
┌─────────────────────────────────────────────────────┐
│ AGENT DASHBOARD │ agent-abc123 │ 2026-03-06 │
├─────────────────────────────────────────────────────┤
│ OPERATIONAL STATUS: ● OK │
│ Uptime: 4h 23m 17s │ Sessions: 142 │
├─────────────────────────────────────────────────────┤
│ THROUGHPUT (last 60s) │
│ Requests/min ████████████░░░░░░░ 42.3 │
│ Error rate █░░░░░░░░░░░░░░░░░░ 0.8% │
│ Latency p50 ████░░░░░░░░░░░░░░░ 87ms │
│ Latency p95 ████████░░░░░░░░░░░ 187ms │
├─────────────────────────────────────────────────────┤
│ FINANCIALS (rolling 1h) │
│ Balance $24.71 ▲ +$0.32 (+1.3%) │
│ P&L delta +$0.32 ██████████░░ (positive) │
│ Referrals 7 clicks │ 2 conversions │
├─────────────────────────────────────────────────────┤
│ SERVICE LATENCY (avg ms, last 100 calls) │
│ casino ████░░░░░░░ 94ms ✓ │
│ wallet ███░░░░░░░░ 71ms ✓ │
│ escrow █████░░░░░░ 118ms ✓ │
│ trading ██████░░░░░ 142ms ✓ │
└─────────────────────────────────────────────────────┘
from rich.console import Console
from rich.table import Table
from rich.live import Live
from rich.panel import Panel
from rich.layout import Layout
import time
console = Console()
def build_dashboard(snap: dict) -> Panel:
table = Table(show_header=True, header_style="bold magenta", box=None)
table.add_column("Metric", style="dim", width=22)
table.add_column("Value", justify="right")
table.add_column("Status", justify="center")
def status_icon(ok: bool) -> str:
return "[green]●[/green]" if ok else "[red]●[/red]"
table.add_row("Requests/min", f"{snap['requests_per_min']:.1f}", status_icon(True))
table.add_row("Error rate", f"{snap['error_rate_pct']:.2f}%", status_icon(snap['error_rate_pct'] < 2))
table.add_row("Latency p95", f"{snap['latency_p95_ms']:.0f}ms", status_icon(snap['latency_p95_ms'] < 2000))
table.add_row("P&L (1h)", f"${snap['pnl_delta_window']:+.4f}", status_icon(snap['pnl_delta_window'] >= 0))
delta = snap.get("balance_delta_pct_1h")
if delta is not None:
table.add_row("Balance delta", f"{delta:+.2f}%", status_icon(delta > -5))
return Panel(table, title="[bold purple]Agent Dashboard[/bold purple]", border_style="purple")
with Live(build_dashboard(metrics.snapshot()), refresh_per_second=1) as live:
while True:
time.sleep(1)
live.update(build_dashboard(metrics.snapshot()))
9. PM2 Process Monitoring Integration
If you're deploying agents on Linux servers, PM2 is the standard process manager. It provides process-level metrics (CPU, memory, restarts) out of the box. Combining PM2's process metrics with your application-level metrics gives you complete coverage.
module.exports = {
apps: [
{
name: "casino-agent",
script: "dist/agent.js",
instances: 1,
autorestart: true,
watch: false,
max_restarts: 10,
restart_delay: 5000,
env: {
NODE_ENV: "production",
PORT: "3010",
LOG_LEVEL: "info",
AGENT_ID: "agent-abc123",
PF_API_KEY: "pf_live_abc123def456", // loaded from secrets manager in production
},
// PM2 log rotation
error_file: "/var/log/agents/casino-agent-error.log",
out_file: "/var/log/agents/casino-agent-out.log",
log_file: "/var/log/agents/casino-agent-combined.log",
time: true,
// Metrics
max_memory_restart: "512M",
}
]
};
# Real-time process view
pm2 monit
# Status of all processes
pm2 status
# Tail logs for a specific agent
pm2 logs casino-agent --lines 100
# Show metrics for an agent
pm2 show casino-agent
# Restart if over memory limit
pm2 restart casino-agent
# Save PM2 process list (survive reboots)
pm2 save
# Flush old logs
pm2 flush casino-agent
For structured log aggregation, pipe PM2's output to a log shipper. Both JSON lines output (via your StructuredFormatter) and PM2's native timestamp prefix are handled well by tools like Promtail (for Loki), Filebeat (for Elasticsearch), or a simple cron that ships to S3.
10. Purple Flea API Response Monitoring
Beyond generic observability, you should track the performance and reliability of each Purple Flea service independently. Each service has different SLA characteristics — the casino API may have higher latency than the wallet API, and the trading service may have more complex error modes.
from collections import defaultdict
from typing import DefaultDict, List
import statistics
class ServiceMonitor:
"""Track latency and reliability per Purple Flea service."""
SERVICES = ["casino", "wallet", "trading", "domains", "faucet", "escrow"]
def __init__(self, window: int = 100):
self.window = window
self._latencies: DefaultDict[str, List[float]] = defaultdict(list)
self._errors: DefaultDict[str, int] = defaultdict(int)
self._calls: DefaultDict[str, int] = defaultdict(int)
def record(self, service: str, latency_ms: float, success: bool):
if service not in self.SERVICES:
return
self._latencies[service].append(latency_ms)
if len(self._latencies[service]) > self.window:
self._latencies[service].pop(0)
self._calls[service] += 1
if not success:
self._errors[service] += 1
def report(self) -> dict:
report = {}
for svc in self.SERVICES:
lats = self._latencies[svc]
calls = self._calls[svc]
errors = self._errors[svc]
if not lats:
report[svc] = {"status": "no_data"}
continue
report[svc] = {
"p50_ms": round(statistics.median(lats), 1),
"p95_ms": round(sorted(lats)[int(len(lats)*0.95)] if len(lats)>1 else lats[-1], 1),
"avg_ms": round(statistics.mean(lats), 1),
"error_rate_pct": round(errors / calls * 100, 2) if calls else 0.0,
"total_calls": calls,
"status": "ok" if (errors / calls < 0.05 if calls else True) else "degraded",
}
return report
def log_report(self):
for svc, data in self.report().items():
if data.get("status") == "no_data":
continue
log.info("service_report", extra={
"event": "service_health",
"service": svc,
**data,
})
svc_monitor = ServiceMonitor(window=200)
import asyncio
import httpx
PURPLE_FLEA_HEALTH_URLS = {
"casino": "https://purpleflea.com/api/casino/status",
"wallet": "https://purpleflea.com/api/wallet/status",
"trading": "https://purpleflea.com/api/trading/status",
"faucet": "https://faucet.purpleflea.com/health",
"escrow": "https://escrow.purpleflea.com/health",
}
async def startup_health_check(api_key: str) -> dict:
"""Verify all Purple Flea services are reachable before starting agent."""
results = {}
async with httpx.AsyncClient(timeout=5.0) as client:
tasks = {
svc: client.get(url, headers={"X-API-Key": api_key})
for svc, url in PURPLE_FLEA_HEALTH_URLS.items()
}
for svc, coro in tasks.items():
try:
resp = await coro
results[svc] = {"ok": resp.status_code < 400, "status": resp.status_code}
except Exception as e:
results[svc] = {"ok": False, "error": str(e)}
failed = [s for s, r in results.items() if not r["ok"]]
if failed:
log.warning("startup_health_check_failed", extra={"failed_services": failed})
else:
log.info("startup_health_check_passed", extra={"services_checked": len(results)})
return results
Start Building Observable Agents
Register on Purple Flea to get your API keys and try the faucet for free. All services return structured JSON responses that integrate cleanly with the patterns above.
Get Started Free Read the Docs