Observability

Logging and Observability for AI Agents:
Debugging Production Issues

March 4, 2026
26 min read
Purple Flea Engineering

When a Purple Flea transaction fails at 3 AM, you need to answer: which agent made the call, what parameters it used, what the API returned, and what decision the agent made next. Structured logging with correlation IDs turns "something went wrong" into a precise, traceable chain of events you can replay in your terminal in under two minutes.

Table of Contents
  1. 01Structured Logging: JSON Over Plain Text
  2. 02Trace and Correlation IDs
  3. 03Log Levels: When to Use Each
  4. 04Centralized Logging Patterns
  5. 05Debugging Production Issues
  6. 06Complete Logging Setup
01

Structured Logging: JSON Over Plain Text

Unstructured logs are noise. When your agent runs 500 API calls per hour across three services, a plain text log file is impossible to filter, aggregate, or query. Structured logging — emitting JSON objects instead of formatted strings — makes every log line queryable by any field.

The difference is immediate: with structured logs you can answer "show me all bets over $50 that resulted in a loss in the last hour" with a single grep command. With plain text logs, you can't.

What a Structured Log Line Looks Like

// Unstructured — hard to parse, filter, or aggregate:
2026-03-04 03:21:44 INFO Bet placed: dice over 50 for 10.00 USDC, won, payout 19.80
// Structured JSON — every field queryable:
{
  "ts": "2026-03-04T03:21:44.312Z",
  "level": "info",
  "event": "bet.complete",
  "agent_id": "dice-agent-001",
  "trace_id": "t_7f3a2c9d",
  "bet_id": "bet_4b8c1a3e",
  "game": "dice",
  "amount_usdc": 10.00,
  "prediction": "over",
  "target": 50,
  "roll": 73,
  "outcome": "win",
  "payout_usdc": 19.80,
  "balance_after": 247.60,
  "api_latency_ms": 84,
  "session_bet_count": 142
}
Python logger.py
import logging
import json
import sys
import time
import traceback
from typing import Any, Dict, Optional

class JSONFormatter(logging.Formatter):
    """Format log records as newline-delimited JSON."""

    def __init__(self, agent_id: str, service: str):
        super().__init__()
        self.agent_id = agent_id
        self.service = service

    def format(self, record: logging.LogRecord) -> str:
        log_obj: Dict[str, Any] = {
            "ts": self._format_ts(record.created),
            "level": record.levelname.lower(),
            "event": record.getMessage(),
            "agent_id": self.agent_id,
            "service": self.service,
            "logger": record.name,
        }

        # Include any extra fields set via logger.info("msg", extra={"key": val})
        for key, val in record.__dict__.items():
            if key not in {
                "msg", "args", "levelname", "levelno", "pathname", "filename",
                "module", "exc_info", "exc_text", "stack_info", "lineno",
                "funcName", "created", "msecs", "relativeCreated",
                "thread", "threadName", "processName", "process", "name", "message",
            }:
                log_obj[key] = val

        if record.exc_info:
            log_obj["error"] = {
                "type": record.exc_info[0].__name__,
                "message": str(record.exc_info[1]),
                "traceback": traceback.format_exception(*record.exc_info),
            }

        return json.dumps(log_obj, default=str)

    def _format_ts(self, created: float) -> str:
        from datetime import datetime, timezone
        dt = datetime.fromtimestamp(created, tz=timezone.utc)
        return dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"

def configure_logging(agent_id: str, service: str, level: str = "INFO") -> logging.Logger:
    """Configure the root logger with JSON formatting."""
    formatter = JSONFormatter(agent_id=agent_id, service=service)
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(formatter)

    root = logging.getLogger()
    root.handlers.clear()
    root.addHandler(handler)
    root.setLevel(getattr(logging, level))

    return logging.getLogger(service)

# Initialize at startup
log = configure_logging(
    agent_id=os.environ.get("AGENT_ID", "default"),
    service="casino-agent",
    level=os.environ.get("LOG_LEVEL", "INFO"),
)
🔎
Query Structured Logs at the Terminal

With JSON logs and jq, filtering is instant: journalctl -u agent | jq 'select(.outcome == "loss" and .amount_usdc > 20)'. No regex gymnastics. Every field is a first-class filter target.

02

Trace and Correlation IDs

A trace ID is a unique identifier that flows through every log line related to a single logical operation. When a bet fails, the trace ID connects the agent's decision log, the API request log, the error log, and the retry log into a single searchable story.

For agents operating financial APIs, every logical "action" should carry a trace ID. This includes: a single bet with its retry chain, a trade order from placement through fill, an escrow lifecycle from creation through release.

Trace ID vs Correlation ID vs Idempotency Key: A trace ID is for observability (trace the code path). A correlation ID connects related events across services. An idempotency key is for deduplication (prevent double-execution). They often share values but serve different purposes.

Context Variables for Automatic Propagation

Use Python's contextvars to store the current trace ID. Any log statement inside the context automatically includes it without explicitly passing it everywhere:

Python trace.py
import uuid
import logging
from contextvars import ContextVar
from contextlib import contextmanager, asynccontextmanager
from typing import Optional

# Thread/task-local trace context
_trace_id: ContextVar[Optional[str]] = ContextVar("trace_id", default=None)
_operation: ContextVar[Optional[str]] = ContextVar("operation", default=None)

def current_trace_id() -> Optional[str]:
    return _trace_id.get()

def current_operation() -> Optional[str]:
    return _operation.get()

@asynccontextmanager
async def trace(operation: str, trace_id: Optional[str] = None):
    """
    Async context manager that sets trace context for all code within.

    Usage:
        async with trace("casino.place_bet") as tid:
            log.info("placing bet", extra={"amount": 10})  # auto-includes trace_id
            result = await client.post("/casino/bet", ...)
    """
    tid = trace_id or f"t_{uuid.uuid4().hex[:8]}"
    token_trace = _trace_id.set(tid)
    token_op = _operation.set(operation)
    start = time.monotonic()
    log = logging.getLogger(operation)

    log.info("trace.start", extra={"trace_id": tid, "op": operation})
    try:
        yield tid
        elapsed = (time.monotonic() - start) * 1000
        log.info("trace.complete", extra={"trace_id": tid, "duration_ms": round(elapsed, 2)})
    except Exception as e:
        elapsed = (time.monotonic() - start) * 1000
        log.error("trace.failed", extra={
            "trace_id": tid,
            "duration_ms": round(elapsed, 2),
            "error_type": type(e).__name__,
        }, exc_info=True)
        raise
    finally:
        _trace_id.reset(token_trace)
        _operation.reset(token_op)

class TraceInjector(logging.Filter):
    """
    Logging filter that injects current trace context into every log record.
    Add to all handlers — trace_id and operation are then always available.
    """

    def filter(self, record: logging.LogRecord) -> bool:
        record.trace_id = current_trace_id()
        record.operation = current_operation()
        return True

# Example: full traced bet execution
async def place_bet_traced(client, params: dict) -> dict:
    async with trace("casino.place_bet") as tid:
        log = logging.getLogger("casino")

        log.debug("sending_bet", extra={
            "trace_id": tid, "game": params["game"], "amount": params["amount"]
        })

        start = time.monotonic()
        result = await client.post("/casino/bet", json=params)
        latency_ms = (time.monotonic() - start) * 1000

        # All the context an investigator needs in one log line
        log.info("bet.complete", extra={
            "trace_id": tid,
            "bet_id": result["id"],
            "game": params["game"],
            "amount_usdc": float(params["amount"]),
            "outcome": result["outcome"],
            "payout_usdc": float(result["payout"]),
            "roll": result.get("roll"),
            "api_latency_ms": round(latency_ms, 2),
        })
        return result

What a Traced Execution Looks Like in Logs

03:21:44.210 casino.place_bet trace.start trace_id=t_7f3a2c9d
03:21:44.211 casino sending_bet trace_id=t_7f3a2c9d game=dice amount=10.00
03:21:44.295 httpx POST /casino/bet 200 OK trace_id=t_7f3a2c9d latency_ms=84
03:21:44.296 casino bet.complete trace_id=t_7f3a2c9d outcome=win payout=19.80
03:21:44.297 casino.place_bet trace.complete trace_id=t_7f3a2c9d duration_ms=87

A single grep t_7f3a2c9d across any log file retrieves the complete story of that bet — decision, API call, result. That is the power of trace IDs.

03

Log Levels: When to Use Each

Using the right log level is essential for filtering in production. If everything is INFO, you drown in noise. If you under-log, you miss critical context when debugging.

DEBUG
Raw API request/response bodies, internal state, decision parameters. Disabled in production.
req_body={"game":"dice","amount":"10.00"}
INFO
Every completed operation: bets, trades, balance checks, escrow events. Core audit trail.
bet.complete outcome=win payout=19.80
WARNING
Retries, rate limits hit, fallback code paths, non-critical API errors.
rate_limit_hit retry_after=2.3s attempt=2
ERROR
Failed operations that were moved to DLQ, unexpected exceptions, corrupted state.
bet.failed after 5 retries → DLQ
CRITICAL
Agent cannot continue: auth failure, wallet access lost, unrecoverable state. Requires immediate human intervention.
auth.revoked cannot_continue=true
Event Level Reason
Bet placed successfully INFO Core audit event — always log
API request sent (body) DEBUG Verbose, disable in production
429 rate limit hit WARNING Abnormal but handled automatically
Retry attempt N WARNING Indicates transient instability
Balance below threshold WARNING Actionable soon but not urgent
Operation moved to DLQ ERROR Financial operation permanently failed
Escrow created INFO Financial commitment — always log
Unhandled exception ERROR Unexpected code path
API key revoked CRITICAL Agent must stop immediately
04

Centralized Logging Patterns

A single agent writing to stdout is fine when you have one agent. When you have ten agents running across multiple machines, you need centralized log aggregation. The standard patterns — Loki, Elasticsearch, CloudWatch — all accept JSON newline-delimited logs from a log shipper.

Log Rotation and Retention

For agents with minimal infrastructure, a rotating file handler plus periodic upload to object storage provides cheap, durable log storage with good query capability via jq:

Python log_rotation.py
import logging
import logging.handlers
import gzip
import shutil
from pathlib import Path

def configure_file_logging(
    agent_id: str,
    log_dir: str = "/var/log/agents",
    max_bytes: int = 50 * 1024 * 1024,  # 50MB per file
    backup_count: int = 10,                # Keep 10 rotated files
) -> None:
    """Add rotating file handler alongside stdout handler."""
    Path(log_dir).mkdir(parents=True, exist_ok=True)
    log_file = f"{log_dir}/{agent_id}.jsonl"

    file_handler = logging.handlers.RotatingFileHandler(
        log_file,
        maxBytes=max_bytes,
        backupCount=backup_count,
        encoding="utf-8",
    )
    file_handler.setFormatter(JSONFormatter(agent_id=agent_id, service="agent"))
    file_handler.addFilter(TraceInjector())

    logging.getLogger().addHandler(file_handler)

async def ship_logs_to_s3(agent_id: str, log_dir: str, bucket: str):
    """Compress and upload rotated log files to S3/R2/MinIO."""
    import boto3
    s3 = boto3.client("s3")
    today = datetime.now(timezone.utc).strftime("%Y-%m-%d")

    for log_file in Path(log_dir).glob(f"{agent_id}.jsonl.*"):
        gz_file = f"{log_file}.gz"
        with open(log_file, "rb") as f_in:
            with gzip.open(gz_file, "wb") as f_out:
                shutil.copyfileobj(f_in, f_out)

        key = f"logs/{agent_id}/{today}/{log_file.name}.gz"
        s3.upload_file(str(gz_file), bucket, key)
        log_file.unlink()
        Path(gz_file).unlink()

Shipping to Loki (Grafana)

Loki is the lightweight log aggregation system from Grafana Labs. It ingests logs with labels and stores them cheaply without indexing log bodies. For JSON logs from agents, it is an excellent choice:

Python loki_handler.py
import asyncio
import aiohttp
import json
import time
from collections import deque

class LokiHandler(logging.Handler):
    """
    Async batch log handler that ships to Loki push API.
    Buffers logs and sends in batches to reduce HTTP overhead.
    """

    def __init__(
        self,
        loki_url: str,
        labels: dict,
        batch_size: int = 100,
        flush_interval: float = 5.0,
    ):
        super().__init__()
        self.loki_url = loki_url.rstrip("/") + "/loki/api/v1/push"
        self.labels = labels
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self._buffer: deque = deque(maxlen=10000)
        asyncio.create_task(self._flush_loop())

    def emit(self, record: logging.LogRecord):
        try:
            msg = self.format(record)
            ts_ns = str(int(record.created * 1e9))
            self._buffer.append((ts_ns, msg))
        except Exception:
            self.handleError(record)

    async def _flush_loop(self):
        while True:
            await asyncio.sleep(self.flush_interval)
            if self._buffer:
                await self._flush()

    async def _flush(self):
        batch = []
        while self._buffer and len(batch) < self.batch_size:
            batch.append(self._buffer.popleft())

        if not batch:
            return

        label_str = ",".join(f'{k}="{v}"' for k, v in self.labels.items())
        payload = {
            "streams": [{
                "stream": {f"{{label_str}}": "", **self.labels},
                "values": batch,
            }]
        }
        try:
            async with aiohttp.ClientSession() as s:
                await s.post(
                    self.loki_url, json=payload,
                    timeout=aiohttp.ClientTimeout(total=10)
                )
        except Exception as e:
            # Re-queue failed batch (up to buffer limit)
            for entry in batch:
                self._buffer.appendleft(entry)

# Add Loki shipping alongside file logging
loki_handler = LokiHandler(
    loki_url=os.environ["LOKI_URL"],
    labels={
        "agent_id": os.environ.get("AGENT_ID", "default"),
        "service": "casino-agent",
        "env": "production",
    },
)
logging.getLogger().addHandler(loki_handler)
05

Debugging Production Issues

When something goes wrong with a production agent, you have structured logs, trace IDs, and metrics. Here is the systematic process for diagnosing issues fast.

Step 1: Identify the Failing Transaction

Start with the alert or symptom — a DLQ entry, a balance anomaly, or a user report. Extract the operation ID or timeframe from the alert payload:

Bash debug.sh
# Find all ERROR logs from the last hour
journalctl -u casino-agent --since "1 hour ago" \
  | jq -c 'select(.level == "error")'

# Get all log lines for a specific trace ID
journalctl -u casino-agent | jq -c 'select(.trace_id == "t_7f3a2c9d")'

# Find bets over $50 that resulted in a loss
journalctl -u casino-agent --since "24 hours ago" \
  | jq -c 'select(.event == "bet.complete" and .outcome == "loss" and .amount_usdc > 50)'

# Count errors by type in last hour
journalctl -u casino-agent --since "1 hour ago" \
  | jq -c 'select(.level == "error") | .error.type' \
  | sort | uniq -c | sort -rn

# View complete timeline of a failing escrow
journalctl -u escrow-agent \
  | jq -c 'select(.escrow_id == "esc_4b8c1a3e")' \
  | jq -r '. | "\(.ts) [\(.level)] \(.event)"'

# Find all DLQ entries from today
sqlite3 /var/lib/agent/dlq.db \
  "SELECT operation_type, error_code, error_message, datetime(last_attempt_ts, 'unixepoch')
   FROM dead_letters WHERE resolved=0 ORDER BY last_attempt_ts DESC LIMIT 20"

Step 2: Reconstruct the Decision Chain

Once you have a trace ID, reconstruct the full execution flow from logs:

Python trace_replay.py
import json
import subprocess
from typing import List

def replay_trace(trace_id: str, log_file: str = None) -> List[dict]:
    """Extract and display all events for a trace ID in chronological order."""
    if log_file:
        result = subprocess.run(
            ["jq", f'-c select(.trace_id == "{trace_id}")', log_file],
            capture_output=True, text=True
        )
        lines = result.stdout.strip().split("\n")
    else:
        result = subprocess.run(
            ["journalctl", "-u", "casino-agent", "-o", "json-pretty", "--no-pager"],
            capture_output=True, text=True
        )
        lines = result.stdout.strip().split("\n")

    events = []
    for line in lines:
        if not line.strip():
            continue
        try:
            ev = json.loads(line)
            if ev.get("trace_id") == trace_id:
                events.append(ev)
        except:
            pass

    events.sort(key=lambda e: e["ts"])

    print(f"\n=== Trace {trace_id} ({'%d events' % len(events)}) ===\n")
    for ev in events:
        level_color = {
            "debug": "\033[34m", "info": "\033[32m",
            "warning": "\033[33m", "error": "\033[31m",
        }.get(ev.get("level", ""), "")
        reset = "\033[0m"
        ts = ev["ts"][11:23]  # Time portion only
        print(f"{ts} {level_color}[{ev.get('level','?').upper():8}]{reset} {ev.get('event', '')}")
        for k in ["bet_id", "amount_usdc", "outcome", "payout_usdc", "error_type"]:
            if k in ev:
                print(f"         {k}={ev[k]}")
    return events

# Usage: python -c "from trace_replay import replay_trace; replay_trace('t_7f3a2c9d')"
06

Complete Logging Setup

Putting it all together: a complete, ready-to-drop-in logging module that configures JSON formatting, trace injection, file rotation, and optional Loki shipping from a single setup call.

Python observability.py
"""
Complete observability setup for Purple Flea agents.

Usage:
    from observability import setup_logging, trace

    setup_logging(agent_id="my-agent", service="casino-agent")

    async def run():
        async with trace("session.start"):
            log = logging.getLogger("agent")
            log.info("starting_session", extra={"balance": 100.0})

            async with trace("casino.bet"):
                result = await place_bet(...)
                log.info("bet.complete", extra={"outcome": result["outcome"]})
"""

import os
import logging
import sys
from dataclasses import dataclass
from typing import Optional

@dataclass
class ObservabilityConfig:
    agent_id: str
    service: str
    log_level: str = "INFO"
    log_file: Optional[str] = None
    loki_url: Optional[str] = None
    pretty_console: bool = False  # Human-readable in dev, JSON in prod

def setup_logging(config: ObservabilityConfig) -> logging.Logger:
    """
    Configure complete observability stack:
    - JSON structured logging to stdout
    - TraceInjector filter on all handlers
    - Optional rotating file handler
    - Optional Loki shipping
    """
    root = logging.getLogger()
    root.handlers.clear()
    root.setLevel(getattr(logging, config.log_level))

    formatter = JSONFormatter(agent_id=config.agent_id, service=config.service)
    trace_filter = TraceInjector()

    # Always: stdout handler
    stdout_handler = logging.StreamHandler(sys.stdout)
    stdout_handler.setFormatter(formatter)
    stdout_handler.addFilter(trace_filter)
    root.addHandler(stdout_handler)

    # Optional: rotating file handler
    if config.log_file:
        configure_file_logging(config.agent_id, log_dir=os.path.dirname(config.log_file))

    # Optional: Loki shipping
    if config.loki_url:
        loki = LokiHandler(
            loki_url=config.loki_url,
            labels={
                "agent_id": config.agent_id,
                "service": config.service,
                "env": os.environ.get("ENV", "production"),
            },
        )
        loki.setFormatter(formatter)
        loki.addFilter(trace_filter)
        root.addHandler(loki)

    # Silence noisy third-party loggers
    logging.getLogger("httpx").setLevel(logging.WARNING)
    logging.getLogger("websockets").setLevel(logging.WARNING)
    logging.getLogger("aiohttp").setLevel(logging.WARNING)

    log = logging.getLogger(config.service)
    log.info("logging.configured", extra={
        "log_level": config.log_level,
        "file_handler": config.log_file is not None,
        "loki_handler": config.loki_url is not None,
    })
    return log

# Entrypoint usage
if __name__ == "__main__":
    cfg = ObservabilityConfig(
        agent_id=os.environ.get("AGENT_ID", "casino-agent-001"),
        service="casino-agent",
        log_level=os.environ.get("LOG_LEVEL", "INFO"),
        log_file=os.environ.get("LOG_FILE"),
        loki_url=os.environ.get("LOKI_URL"),
    )
    log = setup_logging(cfg)
    asyncio.run(main())
The Two-Minute Debug Rule

With structured logging, trace IDs, and the jq queries in section 5, you should be able to identify the root cause of any production issue within two minutes of receiving an alert. If it takes longer, add more log fields to the relevant operations. Good logging pays for itself the first time something goes wrong at 3 AM.

Practice Impact Complexity
JSON structured logs High Low — one formatter change
Trace IDs with contextvars High Medium — wrap operations
Log level discipline High Low — team convention
Rotating file handler Medium Low — stdlib
Loki/Grafana shipping Medium Medium — infra needed
OpenTelemetry tracing High High — full instrumentation

Build Fully Observable Agents on Purple Flea

6 live APIs, free USDC to get started, and a research paper on agent financial infrastructure. Everything you need to build and run production agents.