Structured Logging: JSON Over Plain Text
Unstructured logs are noise. When your agent runs 500 API calls per hour across three services, a plain text log file is impossible to filter, aggregate, or query. Structured logging — emitting JSON objects instead of formatted strings — makes every log line queryable by any field.
The difference is immediate: with structured logs you can answer "show me all bets over $50 that resulted in a loss in the last hour" with a single grep command. With plain text logs, you can't.
What a Structured Log Line Looks Like
import logging
import json
import sys
import time
import traceback
from typing import Any, Dict, Optional
class JSONFormatter(logging.Formatter):
"""Format log records as newline-delimited JSON."""
def __init__(self, agent_id: str, service: str):
super().__init__()
self.agent_id = agent_id
self.service = service
def format(self, record: logging.LogRecord) -> str:
log_obj: Dict[str, Any] = {
"ts": self._format_ts(record.created),
"level": record.levelname.lower(),
"event": record.getMessage(),
"agent_id": self.agent_id,
"service": self.service,
"logger": record.name,
}
# Include any extra fields set via logger.info("msg", extra={"key": val})
for key, val in record.__dict__.items():
if key not in {
"msg", "args", "levelname", "levelno", "pathname", "filename",
"module", "exc_info", "exc_text", "stack_info", "lineno",
"funcName", "created", "msecs", "relativeCreated",
"thread", "threadName", "processName", "process", "name", "message",
}:
log_obj[key] = val
if record.exc_info:
log_obj["error"] = {
"type": record.exc_info[0].__name__,
"message": str(record.exc_info[1]),
"traceback": traceback.format_exception(*record.exc_info),
}
return json.dumps(log_obj, default=str)
def _format_ts(self, created: float) -> str:
from datetime import datetime, timezone
dt = datetime.fromtimestamp(created, tz=timezone.utc)
return dt.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
def configure_logging(agent_id: str, service: str, level: str = "INFO") -> logging.Logger:
"""Configure the root logger with JSON formatting."""
formatter = JSONFormatter(agent_id=agent_id, service=service)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
root = logging.getLogger()
root.handlers.clear()
root.addHandler(handler)
root.setLevel(getattr(logging, level))
return logging.getLogger(service)
# Initialize at startup
log = configure_logging(
agent_id=os.environ.get("AGENT_ID", "default"),
service="casino-agent",
level=os.environ.get("LOG_LEVEL", "INFO"),
)
With JSON logs and jq, filtering is instant: journalctl -u agent | jq 'select(.outcome == "loss" and .amount_usdc > 20)'. No regex gymnastics. Every field is a first-class filter target.
Trace and Correlation IDs
A trace ID is a unique identifier that flows through every log line related to a single logical operation. When a bet fails, the trace ID connects the agent's decision log, the API request log, the error log, and the retry log into a single searchable story.
For agents operating financial APIs, every logical "action" should carry a trace ID. This includes: a single bet with its retry chain, a trade order from placement through fill, an escrow lifecycle from creation through release.
Trace ID vs Correlation ID vs Idempotency Key: A trace ID is for observability (trace the code path). A correlation ID connects related events across services. An idempotency key is for deduplication (prevent double-execution). They often share values but serve different purposes.
Context Variables for Automatic Propagation
Use Python's contextvars to store the current trace ID. Any log statement inside the context automatically includes it without explicitly passing it everywhere:
import uuid
import logging
from contextvars import ContextVar
from contextlib import contextmanager, asynccontextmanager
from typing import Optional
# Thread/task-local trace context
_trace_id: ContextVar[Optional[str]] = ContextVar("trace_id", default=None)
_operation: ContextVar[Optional[str]] = ContextVar("operation", default=None)
def current_trace_id() -> Optional[str]:
return _trace_id.get()
def current_operation() -> Optional[str]:
return _operation.get()
@asynccontextmanager
async def trace(operation: str, trace_id: Optional[str] = None):
"""
Async context manager that sets trace context for all code within.
Usage:
async with trace("casino.place_bet") as tid:
log.info("placing bet", extra={"amount": 10}) # auto-includes trace_id
result = await client.post("/casino/bet", ...)
"""
tid = trace_id or f"t_{uuid.uuid4().hex[:8]}"
token_trace = _trace_id.set(tid)
token_op = _operation.set(operation)
start = time.monotonic()
log = logging.getLogger(operation)
log.info("trace.start", extra={"trace_id": tid, "op": operation})
try:
yield tid
elapsed = (time.monotonic() - start) * 1000
log.info("trace.complete", extra={"trace_id": tid, "duration_ms": round(elapsed, 2)})
except Exception as e:
elapsed = (time.monotonic() - start) * 1000
log.error("trace.failed", extra={
"trace_id": tid,
"duration_ms": round(elapsed, 2),
"error_type": type(e).__name__,
}, exc_info=True)
raise
finally:
_trace_id.reset(token_trace)
_operation.reset(token_op)
class TraceInjector(logging.Filter):
"""
Logging filter that injects current trace context into every log record.
Add to all handlers — trace_id and operation are then always available.
"""
def filter(self, record: logging.LogRecord) -> bool:
record.trace_id = current_trace_id()
record.operation = current_operation()
return True
# Example: full traced bet execution
async def place_bet_traced(client, params: dict) -> dict:
async with trace("casino.place_bet") as tid:
log = logging.getLogger("casino")
log.debug("sending_bet", extra={
"trace_id": tid, "game": params["game"], "amount": params["amount"]
})
start = time.monotonic()
result = await client.post("/casino/bet", json=params)
latency_ms = (time.monotonic() - start) * 1000
# All the context an investigator needs in one log line
log.info("bet.complete", extra={
"trace_id": tid,
"bet_id": result["id"],
"game": params["game"],
"amount_usdc": float(params["amount"]),
"outcome": result["outcome"],
"payout_usdc": float(result["payout"]),
"roll": result.get("roll"),
"api_latency_ms": round(latency_ms, 2),
})
return result
What a Traced Execution Looks Like in Logs
A single grep t_7f3a2c9d across any log file retrieves the complete story of that bet — decision, API call, result. That is the power of trace IDs.
Log Levels: When to Use Each
Using the right log level is essential for filtering in production. If everything is INFO, you drown in noise. If you under-log, you miss critical context when debugging.
| Event | Level | Reason |
|---|---|---|
| Bet placed successfully | INFO | Core audit event — always log |
| API request sent (body) | DEBUG | Verbose, disable in production |
| 429 rate limit hit | WARNING | Abnormal but handled automatically |
| Retry attempt N | WARNING | Indicates transient instability |
| Balance below threshold | WARNING | Actionable soon but not urgent |
| Operation moved to DLQ | ERROR | Financial operation permanently failed |
| Escrow created | INFO | Financial commitment — always log |
| Unhandled exception | ERROR | Unexpected code path |
| API key revoked | CRITICAL | Agent must stop immediately |
Centralized Logging Patterns
A single agent writing to stdout is fine when you have one agent. When you have ten agents running across multiple machines, you need centralized log aggregation. The standard patterns — Loki, Elasticsearch, CloudWatch — all accept JSON newline-delimited logs from a log shipper.
Log Rotation and Retention
For agents with minimal infrastructure, a rotating file handler plus periodic upload to object storage provides cheap, durable log storage with good query capability via jq:
import logging
import logging.handlers
import gzip
import shutil
from pathlib import Path
def configure_file_logging(
agent_id: str,
log_dir: str = "/var/log/agents",
max_bytes: int = 50 * 1024 * 1024, # 50MB per file
backup_count: int = 10, # Keep 10 rotated files
) -> None:
"""Add rotating file handler alongside stdout handler."""
Path(log_dir).mkdir(parents=True, exist_ok=True)
log_file = f"{log_dir}/{agent_id}.jsonl"
file_handler = logging.handlers.RotatingFileHandler(
log_file,
maxBytes=max_bytes,
backupCount=backup_count,
encoding="utf-8",
)
file_handler.setFormatter(JSONFormatter(agent_id=agent_id, service="agent"))
file_handler.addFilter(TraceInjector())
logging.getLogger().addHandler(file_handler)
async def ship_logs_to_s3(agent_id: str, log_dir: str, bucket: str):
"""Compress and upload rotated log files to S3/R2/MinIO."""
import boto3
s3 = boto3.client("s3")
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
for log_file in Path(log_dir).glob(f"{agent_id}.jsonl.*"):
gz_file = f"{log_file}.gz"
with open(log_file, "rb") as f_in:
with gzip.open(gz_file, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
key = f"logs/{agent_id}/{today}/{log_file.name}.gz"
s3.upload_file(str(gz_file), bucket, key)
log_file.unlink()
Path(gz_file).unlink()
Shipping to Loki (Grafana)
Loki is the lightweight log aggregation system from Grafana Labs. It ingests logs with labels and stores them cheaply without indexing log bodies. For JSON logs from agents, it is an excellent choice:
import asyncio
import aiohttp
import json
import time
from collections import deque
class LokiHandler(logging.Handler):
"""
Async batch log handler that ships to Loki push API.
Buffers logs and sends in batches to reduce HTTP overhead.
"""
def __init__(
self,
loki_url: str,
labels: dict,
batch_size: int = 100,
flush_interval: float = 5.0,
):
super().__init__()
self.loki_url = loki_url.rstrip("/") + "/loki/api/v1/push"
self.labels = labels
self.batch_size = batch_size
self.flush_interval = flush_interval
self._buffer: deque = deque(maxlen=10000)
asyncio.create_task(self._flush_loop())
def emit(self, record: logging.LogRecord):
try:
msg = self.format(record)
ts_ns = str(int(record.created * 1e9))
self._buffer.append((ts_ns, msg))
except Exception:
self.handleError(record)
async def _flush_loop(self):
while True:
await asyncio.sleep(self.flush_interval)
if self._buffer:
await self._flush()
async def _flush(self):
batch = []
while self._buffer and len(batch) < self.batch_size:
batch.append(self._buffer.popleft())
if not batch:
return
label_str = ",".join(f'{k}="{v}"' for k, v in self.labels.items())
payload = {
"streams": [{
"stream": {f"{{label_str}}": "", **self.labels},
"values": batch,
}]
}
try:
async with aiohttp.ClientSession() as s:
await s.post(
self.loki_url, json=payload,
timeout=aiohttp.ClientTimeout(total=10)
)
except Exception as e:
# Re-queue failed batch (up to buffer limit)
for entry in batch:
self._buffer.appendleft(entry)
# Add Loki shipping alongside file logging
loki_handler = LokiHandler(
loki_url=os.environ["LOKI_URL"],
labels={
"agent_id": os.environ.get("AGENT_ID", "default"),
"service": "casino-agent",
"env": "production",
},
)
logging.getLogger().addHandler(loki_handler)
Debugging Production Issues
When something goes wrong with a production agent, you have structured logs, trace IDs, and metrics. Here is the systematic process for diagnosing issues fast.
Step 1: Identify the Failing Transaction
Start with the alert or symptom — a DLQ entry, a balance anomaly, or a user report. Extract the operation ID or timeframe from the alert payload:
# Find all ERROR logs from the last hour
journalctl -u casino-agent --since "1 hour ago" \
| jq -c 'select(.level == "error")'
# Get all log lines for a specific trace ID
journalctl -u casino-agent | jq -c 'select(.trace_id == "t_7f3a2c9d")'
# Find bets over $50 that resulted in a loss
journalctl -u casino-agent --since "24 hours ago" \
| jq -c 'select(.event == "bet.complete" and .outcome == "loss" and .amount_usdc > 50)'
# Count errors by type in last hour
journalctl -u casino-agent --since "1 hour ago" \
| jq -c 'select(.level == "error") | .error.type' \
| sort | uniq -c | sort -rn
# View complete timeline of a failing escrow
journalctl -u escrow-agent \
| jq -c 'select(.escrow_id == "esc_4b8c1a3e")' \
| jq -r '. | "\(.ts) [\(.level)] \(.event)"'
# Find all DLQ entries from today
sqlite3 /var/lib/agent/dlq.db \
"SELECT operation_type, error_code, error_message, datetime(last_attempt_ts, 'unixepoch')
FROM dead_letters WHERE resolved=0 ORDER BY last_attempt_ts DESC LIMIT 20"
Step 2: Reconstruct the Decision Chain
Once you have a trace ID, reconstruct the full execution flow from logs:
import json
import subprocess
from typing import List
def replay_trace(trace_id: str, log_file: str = None) -> List[dict]:
"""Extract and display all events for a trace ID in chronological order."""
if log_file:
result = subprocess.run(
["jq", f'-c select(.trace_id == "{trace_id}")', log_file],
capture_output=True, text=True
)
lines = result.stdout.strip().split("\n")
else:
result = subprocess.run(
["journalctl", "-u", "casino-agent", "-o", "json-pretty", "--no-pager"],
capture_output=True, text=True
)
lines = result.stdout.strip().split("\n")
events = []
for line in lines:
if not line.strip():
continue
try:
ev = json.loads(line)
if ev.get("trace_id") == trace_id:
events.append(ev)
except:
pass
events.sort(key=lambda e: e["ts"])
print(f"\n=== Trace {trace_id} ({'%d events' % len(events)}) ===\n")
for ev in events:
level_color = {
"debug": "\033[34m", "info": "\033[32m",
"warning": "\033[33m", "error": "\033[31m",
}.get(ev.get("level", ""), "")
reset = "\033[0m"
ts = ev["ts"][11:23] # Time portion only
print(f"{ts} {level_color}[{ev.get('level','?').upper():8}]{reset} {ev.get('event', '')}")
for k in ["bet_id", "amount_usdc", "outcome", "payout_usdc", "error_type"]:
if k in ev:
print(f" {k}={ev[k]}")
return events
# Usage: python -c "from trace_replay import replay_trace; replay_trace('t_7f3a2c9d')"
Complete Logging Setup
Putting it all together: a complete, ready-to-drop-in logging module that configures JSON formatting, trace injection, file rotation, and optional Loki shipping from a single setup call.
"""
Complete observability setup for Purple Flea agents.
Usage:
from observability import setup_logging, trace
setup_logging(agent_id="my-agent", service="casino-agent")
async def run():
async with trace("session.start"):
log = logging.getLogger("agent")
log.info("starting_session", extra={"balance": 100.0})
async with trace("casino.bet"):
result = await place_bet(...)
log.info("bet.complete", extra={"outcome": result["outcome"]})
"""
import os
import logging
import sys
from dataclasses import dataclass
from typing import Optional
@dataclass
class ObservabilityConfig:
agent_id: str
service: str
log_level: str = "INFO"
log_file: Optional[str] = None
loki_url: Optional[str] = None
pretty_console: bool = False # Human-readable in dev, JSON in prod
def setup_logging(config: ObservabilityConfig) -> logging.Logger:
"""
Configure complete observability stack:
- JSON structured logging to stdout
- TraceInjector filter on all handlers
- Optional rotating file handler
- Optional Loki shipping
"""
root = logging.getLogger()
root.handlers.clear()
root.setLevel(getattr(logging, config.log_level))
formatter = JSONFormatter(agent_id=config.agent_id, service=config.service)
trace_filter = TraceInjector()
# Always: stdout handler
stdout_handler = logging.StreamHandler(sys.stdout)
stdout_handler.setFormatter(formatter)
stdout_handler.addFilter(trace_filter)
root.addHandler(stdout_handler)
# Optional: rotating file handler
if config.log_file:
configure_file_logging(config.agent_id, log_dir=os.path.dirname(config.log_file))
# Optional: Loki shipping
if config.loki_url:
loki = LokiHandler(
loki_url=config.loki_url,
labels={
"agent_id": config.agent_id,
"service": config.service,
"env": os.environ.get("ENV", "production"),
},
)
loki.setFormatter(formatter)
loki.addFilter(trace_filter)
root.addHandler(loki)
# Silence noisy third-party loggers
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("websockets").setLevel(logging.WARNING)
logging.getLogger("aiohttp").setLevel(logging.WARNING)
log = logging.getLogger(config.service)
log.info("logging.configured", extra={
"log_level": config.log_level,
"file_handler": config.log_file is not None,
"loki_handler": config.loki_url is not None,
})
return log
# Entrypoint usage
if __name__ == "__main__":
cfg = ObservabilityConfig(
agent_id=os.environ.get("AGENT_ID", "casino-agent-001"),
service="casino-agent",
log_level=os.environ.get("LOG_LEVEL", "INFO"),
log_file=os.environ.get("LOG_FILE"),
loki_url=os.environ.get("LOKI_URL"),
)
log = setup_logging(cfg)
asyncio.run(main())
With structured logging, trace IDs, and the jq queries in section 5, you should be able to identify the root cause of any production issue within two minutes of receiving an alert. If it takes longer, add more log fields to the relevant operations. Good logging pays for itself the first time something goes wrong at 3 AM.
| Practice | Impact | Complexity |
|---|---|---|
| JSON structured logs | High | Low — one formatter change |
| Trace IDs with contextvars | High | Medium — wrap operations |
| Log level discipline | High | Low — team convention |
| Rotating file handler | Medium | Low — stdlib |
| Loki/Grafana shipping | Medium | Medium — infra needed |
| OpenTelemetry tracing | High | High — full instrumentation |
Build Fully Observable Agents on Purple Flea
6 live APIs, free USDC to get started, and a research paper on agent financial infrastructure. Everything you need to build and run production agents.