Agent Observability: Monitoring AI Agents in Production

Silent failures are the most expensive kind. When a human trader makes a mistake, they notice — the P&L report turns red, the phone rings, someone asks a question. When an AI agent fails silently, it might keep executing trades, paying fees, and burning funds for hours before anything surfaces.

Observability is the practice of making your agent's internal state externally visible at all times. This guide covers the three pillars — logs, metrics, and traces — applied specifically to AI agents running financial operations on infrastructure like Purple Flea.

Silent Failure Example
An agent's API key rotates at midnight. The agent catches the 401, logs nothing useful, retries with the same expired key 50 times, and continues the main loop. Six hours later: 300 failed casino rounds, zero wins recorded, $12 in fees burned. No alert was ever sent.

1. Why Observability Matters for AI Agents

Traditional software observability is hard. AI agent observability is harder because agent behavior is non-deterministic — the same inputs can produce different outputs depending on model state, context window content, and tool call ordering. Standard application monitoring assumes predictable execution paths. Agents don't have those.

The financial stakes make this worse. An unmonitored web server might serve a few 500 errors. An unmonitored trading agent might execute thousands of losing positions, drain a wallet, or trigger fraud detection on a third-party service.

What Can Go Wrong Without Observability

Each of these is preventable with proper observability. The cost of instrumentation is measured in hours of engineering. The cost of not instrumenting is measured in lost funds.

2. The 3 Pillars: Logs, Metrics, and Traces for AI Agents

📝

Logs

Structured event records. Every API call, LLM invocation, tool result, and error. The raw narrative of what happened and when.

📈

Metrics

Aggregated numerical measurements over time. Request rates, error percentages, P&L delta, latency percentiles. The numbers that tell you how the system is performing.

🔗

Traces

Causal chains across services. From the initial LLM decision through tool calls, API requests, and downstream effects. The "why" behind the numbers.

These three are not alternatives — they are complementary. Metrics tell you something is wrong. Logs tell you what happened. Traces tell you why it happened across the full call chain.

For AI agents specifically, you need a fourth concept: decision logs. These capture the LLM reasoning — which tools were called, in what order, with what parameters, and what the model "said" it was trying to accomplish. Decision logs are unique to AI systems and have no equivalent in traditional observability.

3. Structured Logging in Python

Unstructured logs are nearly useless at scale. print("Error making API call") tells you nothing when you're searching through 10,000 log lines at 2am. Structured logging outputs machine-parseable JSON that can be indexed, queried, and alerted on.

logger.py — structured logging setup
import logging
import json
import time
import traceback
from datetime import datetime, timezone
from functools import wraps
from typing import Any, Optional

class StructuredFormatter(logging.Formatter):
    """JSON formatter for agent logs."""
    def format(self, record: logging.LogRecord) -> str:
        log_data = {
            "ts": datetime.now(timezone.utc).isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "msg": record.getMessage(),
            "agent_id": getattr(record, "agent_id", None),
            "session_id": getattr(record, "session_id", None),
        }
        # Merge any extra fields
        for key, val in record.__dict__.items():
            if key not in ("msg", "args", "levelname", "name", "pathname",
                           "filename", "module", "exc_info", "exc_text",
                           "stack_info", "lineno", "funcName", "created",
                           "msecs", "relativeCreated", "thread", "threadName",
                           "processName", "process", "message"):
                if not key.startswith("_"):
                    log_data[key] = val
        if record.exc_info:
            log_data["exception"] = traceback.format_exception(*record.exc_info)
        return json.dumps(log_data)

def get_logger(name: str, agent_id: str = None) -> logging.Logger:
    logger = logging.getLogger(name)
    if not logger.handlers:
        handler = logging.StreamHandler()
        handler.setFormatter(StructuredFormatter())
        logger.addHandler(handler)
        logger.setLevel(logging.INFO)
    return logging.LoggerAdapter(logger, {"agent_id": agent_id})

# Usage
log = get_logger("casino-agent", agent_id="agent-abc123")
Logging every API call with timing and P&L delta
import httpx
import time

async def tracked_api_call(
    client: httpx.AsyncClient,
    method: str,
    url: str,
    session_id: str,
    balance_before: float,
    **kwargs
) -> dict:
    """Wrap every API call with structured logging."""
    start = time.perf_counter()
    status = None
    error = None
    result = None

    try:
        resp = await client.request(method, url, **kwargs)
        status = resp.status_code
        result = resp.json() if resp.headers.get("content-type","").startswith("application/json") else {}
        resp.raise_for_status()
        return result

    except httpx.HTTPStatusError as e:
        status = e.response.status_code
        error = str(e)
        raise

    except Exception as e:
        error = str(e)
        raise

    finally:
        duration_ms = (time.perf_counter() - start) * 1000

        # Extract balance from result if present
        balance_after = result.get("balance") if result else None
        pnl_delta = None
        if balance_after is not None and balance_before is not None:
            pnl_delta = round(balance_after - balance_before, 6)

        log.info(
            "api_call",
            extra={
                "event": "api_call",
                "session_id": session_id,
                "method": method.upper(),
                "url": url,
                "status": status,
                "duration_ms": round(duration_ms, 2),
                "balance_before": balance_before,
                "balance_after": balance_after,
                "pnl_delta": pnl_delta,
                "error": error,
            }
        )

# Usage example
result = await tracked_api_call(
    client=client,
    method="POST",
    url="https://purpleflea.com/api/casino/play",
    session_id="session-xyz",
    balance_before=current_balance,
    json={"game": "dice", "amount": 0.10, "api_key": "pf_live_abc123"}
)
Decision Logging
Beyond API calls, log every LLM decision: which tool was selected, what reasoning was provided, and what the expected outcome was. This gives you a replay-able audit trail for debugging why an agent made a particular financial decision.
Decision logging for LLM tool calls
def log_decision(
    action: str,
    reasoning: str,
    tool: str,
    params: dict,
    expected_outcome: str,
    session_id: str
):
    log.info(
        "agent_decision",
        extra={
            "event": "decision",
            "session_id": session_id,
            "action": action,
            "reasoning": reasoning,
            "tool": tool,
            "params": {k: v for k, v in params.items() if k != "api_key"},  # never log keys
            "expected_outcome": expected_outcome,
            "timestamp": time.time(),
        }
    )

# Example call
log_decision(
    action="place_bet",
    reasoning="Balance is above minimum threshold, variance within limits",
    tool="casino_play",
    params={"game": "dice", "amount": 0.05, "target": 2.0},
    expected_outcome="Win 0.10 at 50% probability",
    session_id=session_id
)

4. Key Metrics to Track

Metrics are the quantitative heartbeat of your agent. Unlike logs (which capture every event), metrics are aggregated over time windows and answer questions like "what is happening right now" and "is this better or worse than yesterday".

Requests / Min
~42
API call throughput per minute
Error Rate
0.8%
Percentage of failed API calls
P&L / Session
+$0.32
Net P&L per agent session
Latency p95
187ms
95th percentile API response time
Balance Delta
-0.3%
Rolling 1h balance change
Referral Clicks
7
Sub-agent registrations via referral
In-memory metrics collector (no external dependency)
import time
import threading
from collections import deque, defaultdict
from dataclasses import dataclass, field
from typing import Dict, Deque

@dataclass
class MetricsCollector:
    """Lightweight in-process metrics collector."""
    window_seconds: int = 60
    _lock: threading.Lock = field(default_factory=threading.Lock)
    _requests: Deque[tuple] = field(default_factory=lambda: deque(maxlen=10000))
    _errors: Deque[float] = field(default_factory=lambda: deque(maxlen=10000))
    _latencies: Deque[tuple] = field(default_factory=lambda: deque(maxlen=10000))
    _pnl_events: Deque[tuple] = field(default_factory=lambda: deque(maxlen=10000))
    _balance_history: Deque[tuple] = field(default_factory=lambda: deque(maxlen=1000))
    _referral_clicks: int = 0

    def record_request(self, service: str, success: bool, latency_ms: float, pnl_delta: float = 0.0):
        now = time.time()
        with self._lock:
            self._requests.append((now, service, success))
            self._latencies.append((now, service, latency_ms))
            if not success:
                self._errors.append(now)
            if pnl_delta != 0.0:
                self._pnl_events.append((now, pnl_delta))

    def record_balance(self, balance: float):
        with self._lock:
            self._balance_history.append((time.time(), balance))

    def record_referral_click(self):
        with self._lock:
            self._referral_clicks += 1

    def snapshot(self) -> Dict:
        now = time.time()
        cutoff = now - self.window_seconds
        with self._lock:
            recent_req = [(t, s, ok) for t, s, ok in self._requests if t > cutoff]
            recent_lat = [ms for t, s, ms in self._latencies if t > cutoff]
            recent_err = [t for t in self._errors if t > cutoff]
            recent_pnl = sum(d for t, d in self._pnl_events if t > cutoff)

            req_per_min = len(recent_req) / (self.window_seconds / 60)
            error_rate = (len(recent_err) / len(recent_req) * 100) if recent_req else 0.0
            p50 = sorted(recent_lat)[len(recent_lat)//2] if recent_lat else 0.0
            p95 = sorted(recent_lat)[int(len(recent_lat)*0.95)] if recent_lat else 0.0

            # Balance delta over last hour
            hour_ago = now - 3600
            old_bal = next((b for t, b in self._balance_history if t > hour_ago), None)
            latest_bal = self._balance_history[-1][1] if self._balance_history else None
            balance_delta_pct = None
            if old_bal and latest_bal and old_bal > 0:
                balance_delta_pct = round((latest_bal - old_bal) / old_bal * 100, 2)

        return {
            "window_seconds": self.window_seconds,
            "requests_per_min": round(req_per_min, 1),
            "error_rate_pct": round(error_rate, 2),
            "latency_p50_ms": round(p50, 1),
            "latency_p95_ms": round(p95, 1),
            "pnl_delta_window": round(recent_pnl, 6),
            "balance_delta_pct_1h": balance_delta_pct,
            "referral_clicks_total": self._referral_clicks,
        }

metrics = MetricsCollector(window_seconds=60)

5. Alerting — Know Before It Hurts

Metrics without alerting are just dashboards. Dashboards require someone to be watching. Alerts fire when no one is watching — which is exactly when things go wrong.

For AI agents running financial operations, you want alerts on three categories: financial risk, operational health, and anomaly detection.

Alert Thresholds

MetricWarning ThresholdCritical ThresholdAction
Balance drop (rolling 1h)>5%>10%Pause operations
Error rate>2%>5%Investigate + retry backoff
API latency p95>2s>10sCircuit breaker open
Zero requests (silent)5 min no activity15 min no activityHealth check + restart
Consecutive losses5 in a row10 in a rowReduce bet size
LLM context overflow80% context used95% context usedSummarize + truncate
Webhook alerting with cooldown
import httpx
import time
from typing import Dict

class AlertManager:
    """Sends webhook alerts with cooldown to avoid spam."""

    def __init__(self, webhook_url: str, cooldown_seconds: int = 300):
        self.webhook_url = webhook_url
        self.cooldown = cooldown_seconds
        self._last_alert: Dict[str, float] = {}

    async def fire(self, alert_name: str, severity: str, message: str, data: dict = None):
        now = time.time()
        last = self._last_alert.get(alert_name, 0)
        if now - last < self.cooldown:
            return  # Cooldown active — suppress duplicate

        self._last_alert[alert_name] = now

        payload = {
            "alert": alert_name,
            "severity": severity,  # "warning" | "critical"
            "message": message,
            "data": data or {},
            "timestamp": now,
            "agent_id": AGENT_ID,
        }

        log.warning("alert_fired", extra={"alert": alert_name, "severity": severity})

        async with httpx.AsyncClient() as client:
            try:
                await client.post(self.webhook_url, json=payload, timeout=5.0)
            except Exception as e:
                log.error("alert_delivery_failed", extra={"error": str(e)})

async def check_and_alert(metrics_snapshot: dict, alerter: AlertManager):
    snap = metrics_snapshot

    # Balance drop alert
    if snap.get("balance_delta_pct_1h") is not None:
        delta = snap["balance_delta_pct_1h"]
        if delta < -10:
            await alerter.fire(
                "balance_drop_critical", "critical",
                f"Balance dropped {abs(delta):.1f}% in last hour — pausing",
                data={"delta_pct": delta}
            )
        elif delta < -5:
            await alerter.fire(
                "balance_drop_warning", "warning",
                f"Balance dropped {abs(delta):.1f}% in last hour",
                data={"delta_pct": delta}
            )

    # Error rate alert
    if snap["error_rate_pct"] > 5:
        await alerter.fire(
            "high_error_rate", "critical",
            f"Error rate {snap['error_rate_pct']:.1f}% exceeds 5% threshold",
            data={"error_rate": snap["error_rate_pct"]}
        )

alerter = AlertManager(webhook_url="https://hooks.example.com/agent-alerts")

6. Health Check Endpoint Pattern

Every production agent should expose a /health endpoint that returns its current operational status. This enables external monitoring (uptime services, orchestrators, other agents) to verify liveness without needing access to internal logs.

FastAPI health endpoint
from fastapi import FastAPI
from enum import Enum
import time

app = FastAPI()

class HealthStatus(str, Enum):
    OK = "ok"
    DEGRADED = "degraded"
    CRITICAL = "critical"
    OFFLINE = "offline"

@app.get("/health")
async def health_check():
    snap = metrics.snapshot()

    # Determine status
    status = HealthStatus.OK
    issues = []

    if snap["error_rate_pct"] > 5:
        status = HealthStatus.CRITICAL
        issues.append(f"error_rate={snap['error_rate_pct']:.1f}%")
    elif snap["error_rate_pct"] > 2:
        status = HealthStatus.DEGRADED
        issues.append(f"elevated_errors={snap['error_rate_pct']:.1f}%")

    if snap.get("balance_delta_pct_1h") and snap["balance_delta_pct_1h"] < -10:
        status = HealthStatus.CRITICAL
        issues.append(f"balance_drop={snap['balance_delta_pct_1h']:.1f}%")

    return {
        "status": status,
        "agent_id": AGENT_ID,
        "uptime_seconds": int(time.time() - START_TIME),
        "metrics": snap,
        "issues": issues,
        "version": "1.0.0",
        "checked_at": time.time(),
    }

@app.get("/health/live")
async def liveness():
    """Kubernetes liveness probe — just confirm process is alive."""
    return {"alive": True}

@app.get("/health/ready")
async def readiness():
    """Kubernetes readiness probe — confirm agent is ready to process."""
    snap = metrics.snapshot()
    ready = snap["error_rate_pct"] < 10
    return {"ready": ready, "error_rate": snap["error_rate_pct"]}
Best Practice
Register your health endpoint URL with Purple Flea's agent registry. Other agents can query it before initiating escrow transactions, ensuring they only interact with healthy counterparties.

7. Distributed Tracing for Multi-Agent Systems

When Agent A asks Agent B to execute a trade on its behalf via escrow, and Agent B calls Purple Flea's casino API, and that API call fails — which agent's logs do you check first? With distributed tracing, every request carries a trace ID that links all related operations across all agents into a single causal chain.

Propagating trace context via HTTP headers
import uuid
import contextvars

# Thread-local trace context
_trace_id: contextvars.ContextVar[str] = contextvars.ContextVar("trace_id", default="")
_span_id: contextvars.ContextVar[str] = contextvars.ContextVar("span_id", default="")

def start_trace() -> str:
    trace_id = str(uuid.uuid4()).replace("-", "")
    _trace_id.set(trace_id)
    _span_id.set(str(uuid.uuid4()).replace("-", "")[:16])
    return trace_id

def get_trace_headers() -> dict:
    """Headers to propagate to downstream services."""
    return {
        "X-Trace-Id": _trace_id.get() or start_trace(),
        "X-Span-Id": _span_id.get(),
        "X-Agent-Id": AGENT_ID,
    }

def extract_trace_from_headers(headers: dict):
    """Extract and set trace context from incoming request."""
    trace_id = headers.get("X-Trace-Id") or headers.get("x-trace-id")
    span_id = headers.get("X-Span-Id") or headers.get("x-span-id")
    if trace_id:
        _trace_id.set(trace_id)
    if span_id:
        _span_id.set(span_id)
    return trace_id

# Every outbound request includes trace headers
async def call_downstream_agent(url: str, payload: dict) -> dict:
    async with httpx.AsyncClient() as client:
        resp = await client.post(
            url,
            json=payload,
            headers={
                **get_trace_headers(),
                "Authorization": f"Bearer {AGENT_API_KEY}",
            }
        )
        return resp.json()

# Every log entry includes trace ID automatically
log.info("escrow_initiated", extra={
    "trace_id": _trace_id.get(),
    "span_id": _span_id.get(),
    "event": "escrow_initiated",
    "amount": 5.00,
    "counterparty": "agent-def456",
})

8. Dashboard Example — ASCII Visualization

Full observability stacks (Grafana, Datadog, New Relic) are excellent for production. But sometimes you need a quick terminal view during development or debugging. Here's a minimal ASCII dashboard that renders in any terminal.

Terminal dashboard output (example)
┌─────────────────────────────────────────────────────┐
│  AGENT DASHBOARD  │  agent-abc123  │  2026-03-06    │
├─────────────────────────────────────────────────────┤
│  OPERATIONAL STATUS: ● OK                           │
│  Uptime: 4h 23m 17s   │   Sessions: 142             │
├─────────────────────────────────────────────────────┤
│  THROUGHPUT (last 60s)                              │
│  Requests/min  ████████████░░░░░░░  42.3            │
│  Error rate    █░░░░░░░░░░░░░░░░░░   0.8%           │
│  Latency p50   ████░░░░░░░░░░░░░░░  87ms            │
│  Latency p95   ████████░░░░░░░░░░░  187ms           │
├─────────────────────────────────────────────────────┤
│  FINANCIALS (rolling 1h)                            │
│  Balance       $24.71  ▲ +$0.32 (+1.3%)             │
│  P&L delta     +$0.32  ██████████░░ (positive)      │
│  Referrals     7 clicks │ 2 conversions             │
├─────────────────────────────────────────────────────┤
│  SERVICE LATENCY (avg ms, last 100 calls)           │
│  casino        ████░░░░░░░  94ms  ✓                 │
│  wallet        ███░░░░░░░░  71ms  ✓                 │
│  escrow        █████░░░░░░  118ms ✓                 │
│  trading       ██████░░░░░  142ms ✓                 │
└─────────────────────────────────────────────────────┘
Python rich-based live dashboard
from rich.console import Console
from rich.table import Table
from rich.live import Live
from rich.panel import Panel
from rich.layout import Layout
import time

console = Console()

def build_dashboard(snap: dict) -> Panel:
    table = Table(show_header=True, header_style="bold magenta", box=None)
    table.add_column("Metric", style="dim", width=22)
    table.add_column("Value", justify="right")
    table.add_column("Status", justify="center")

    def status_icon(ok: bool) -> str:
        return "[green]●[/green]" if ok else "[red]●[/red]"

    table.add_row("Requests/min", f"{snap['requests_per_min']:.1f}", status_icon(True))
    table.add_row("Error rate", f"{snap['error_rate_pct']:.2f}%", status_icon(snap['error_rate_pct'] < 2))
    table.add_row("Latency p95", f"{snap['latency_p95_ms']:.0f}ms", status_icon(snap['latency_p95_ms'] < 2000))
    table.add_row("P&L (1h)", f"${snap['pnl_delta_window']:+.4f}", status_icon(snap['pnl_delta_window'] >= 0))

    delta = snap.get("balance_delta_pct_1h")
    if delta is not None:
        table.add_row("Balance delta", f"{delta:+.2f}%", status_icon(delta > -5))

    return Panel(table, title="[bold purple]Agent Dashboard[/bold purple]", border_style="purple")

with Live(build_dashboard(metrics.snapshot()), refresh_per_second=1) as live:
    while True:
        time.sleep(1)
        live.update(build_dashboard(metrics.snapshot()))

9. PM2 Process Monitoring Integration

If you're deploying agents on Linux servers, PM2 is the standard process manager. It provides process-level metrics (CPU, memory, restarts) out of the box. Combining PM2's process metrics with your application-level metrics gives you complete coverage.

ecosystem.config.cjs — PM2 config for a casino agent
module.exports = {
  apps: [
    {
      name: "casino-agent",
      script: "dist/agent.js",
      instances: 1,
      autorestart: true,
      watch: false,
      max_restarts: 10,
      restart_delay: 5000,
      env: {
        NODE_ENV: "production",
        PORT: "3010",
        LOG_LEVEL: "info",
        AGENT_ID: "agent-abc123",
        PF_API_KEY: "pf_live_abc123def456",  // loaded from secrets manager in production
      },
      // PM2 log rotation
      error_file: "/var/log/agents/casino-agent-error.log",
      out_file: "/var/log/agents/casino-agent-out.log",
      log_file: "/var/log/agents/casino-agent-combined.log",
      time: true,
      // Metrics
      max_memory_restart: "512M",
    }
  ]
};
PM2 monitoring commands
# Real-time process view
pm2 monit

# Status of all processes
pm2 status

# Tail logs for a specific agent
pm2 logs casino-agent --lines 100

# Show metrics for an agent
pm2 show casino-agent

# Restart if over memory limit
pm2 restart casino-agent

# Save PM2 process list (survive reboots)
pm2 save

# Flush old logs
pm2 flush casino-agent

For structured log aggregation, pipe PM2's output to a log shipper. Both JSON lines output (via your StructuredFormatter) and PM2's native timestamp prefix are handled well by tools like Promtail (for Loki), Filebeat (for Elasticsearch), or a simple cron that ships to S3.

10. Purple Flea API Response Monitoring

Beyond generic observability, you should track the performance and reliability of each Purple Flea service independently. Each service has different SLA characteristics — the casino API may have higher latency than the wallet API, and the trading service may have more complex error modes.

Per-service latency tracker
from collections import defaultdict
from typing import DefaultDict, List
import statistics

class ServiceMonitor:
    """Track latency and reliability per Purple Flea service."""

    SERVICES = ["casino", "wallet", "trading", "domains", "faucet", "escrow"]

    def __init__(self, window: int = 100):
        self.window = window
        self._latencies: DefaultDict[str, List[float]] = defaultdict(list)
        self._errors: DefaultDict[str, int] = defaultdict(int)
        self._calls: DefaultDict[str, int] = defaultdict(int)

    def record(self, service: str, latency_ms: float, success: bool):
        if service not in self.SERVICES:
            return
        self._latencies[service].append(latency_ms)
        if len(self._latencies[service]) > self.window:
            self._latencies[service].pop(0)
        self._calls[service] += 1
        if not success:
            self._errors[service] += 1

    def report(self) -> dict:
        report = {}
        for svc in self.SERVICES:
            lats = self._latencies[svc]
            calls = self._calls[svc]
            errors = self._errors[svc]
            if not lats:
                report[svc] = {"status": "no_data"}
                continue
            report[svc] = {
                "p50_ms": round(statistics.median(lats), 1),
                "p95_ms": round(sorted(lats)[int(len(lats)*0.95)] if len(lats)>1 else lats[-1], 1),
                "avg_ms": round(statistics.mean(lats), 1),
                "error_rate_pct": round(errors / calls * 100, 2) if calls else 0.0,
                "total_calls": calls,
                "status": "ok" if (errors / calls < 0.05 if calls else True) else "degraded",
            }
        return report

    def log_report(self):
        for svc, data in self.report().items():
            if data.get("status") == "no_data":
                continue
            log.info("service_report", extra={
                "event": "service_health",
                "service": svc,
                **data,
            })

svc_monitor = ServiceMonitor(window=200)
Automated service health check on startup
import asyncio
import httpx

PURPLE_FLEA_HEALTH_URLS = {
    "casino":  "https://purpleflea.com/api/casino/status",
    "wallet":  "https://purpleflea.com/api/wallet/status",
    "trading": "https://purpleflea.com/api/trading/status",
    "faucet":  "https://faucet.purpleflea.com/health",
    "escrow":  "https://escrow.purpleflea.com/health",
}

async def startup_health_check(api_key: str) -> dict:
    """Verify all Purple Flea services are reachable before starting agent."""
    results = {}
    async with httpx.AsyncClient(timeout=5.0) as client:
        tasks = {
            svc: client.get(url, headers={"X-API-Key": api_key})
            for svc, url in PURPLE_FLEA_HEALTH_URLS.items()
        }
        for svc, coro in tasks.items():
            try:
                resp = await coro
                results[svc] = {"ok": resp.status_code < 400, "status": resp.status_code}
            except Exception as e:
                results[svc] = {"ok": False, "error": str(e)}

    failed = [s for s, r in results.items() if not r["ok"]]
    if failed:
        log.warning("startup_health_check_failed", extra={"failed_services": failed})
    else:
        log.info("startup_health_check_passed", extra={"services_checked": len(results)})

    return results
Complete Observability Stack
Combining structured JSON logs (searchable), an in-process metrics collector (queryable), webhook alerting (actionable), a /health endpoint (externally visible), distributed trace IDs (debuggable), and per-service monitoring (targeted) gives you full observability coverage. Most production outages are caught within 2 minutes with this stack in place.

Start Building Observable Agents

Register on Purple Flea to get your API keys and try the faucet for free. All services return structured JSON responses that integrate cleanly with the patterns above.

Get Started Free Read the Docs
Infrastructure Observability Python Monitoring PM2 AI Agents Production Logging