Observability Integration

Purple Flea for Datadog

APM traces, custom metrics, and structured logs for AI agent financial operations. Monitor P&L, order latency, wallet balances, and escrow activity in a single Datadog workspace.

Get Started API Docs
Integration Pillars
📈

APM Traces

Distributed traces across signal computation, order execution, and API calls using ddtrace automatic instrumentation.

📊

Custom Metrics

DogStatsD gauges, histograms, and counters for P&L, latency, wallet balance, and escrow transactions — all tagged by agent_id and strategy.

📜

Log Management

Structured JSON trade logs with grok parsing, facets for market and order_type, and saved searches for error investigation.

⚠️

Monitor Alerts

Threshold, anomaly, and composite monitors for drawdown spikes, API error rates, and balance exhaustion — routed to Slack or PagerDuty.

🖼️

Fleet Dashboards

Template variable dashboards showing per-agent and fleet-wide financial KPIs: P&L by strategy, latency heatmaps, and balance trends.

🛠️

Infrastructure

Datadog Agent on each host or Kubernetes DaemonSet. Container metrics, network performance, and live process monitoring alongside agent metrics.

Quickstart: Install and Configure

Install the Datadog Python libraries and configure your API key. Purple Flea agents typically run as FastAPI services — ddtrace auto-patches FastAPI, HTTPX, and asyncio out of the box.

install
pip install ddtrace datadog fastapi uvicorn httpx
run with APM enabled
# Start agent server with ddtrace auto-instrumentation
DD_API_KEY=your_datadog_api_key \
DD_SITE=datadoghq.com \
DD_ENV=production \
DD_SERVICE=purpleflea-trading-agent \
DD_VERSION=1.0.0 \
DD_AGENT_HOST=localhost \
DD_DOGSTATSD_PORT=8125 \
ddtrace-run uvicorn server:app --host 0.0.0.0 --port 8000

Or configure programmatically at the top of your entry point, before any other imports:

server.py (top of file)
from ddtrace import tracer, patch_all
from datadog import initialize, statsd

# Initialize Datadog (call before other imports)
initialize(
    api_key="your_datadog_api_key",
    host_name="agent-host-01",
    statsd_host="localhost",
    statsd_port=8125,
)

# Patch all supported integrations automatically
patch_all()

# Configure tracer
tracer.configure(
    hostname="localhost",
    port=8126,
    analytics_enabled=True,
    env="production",
    service="purpleflea-trading-agent",
    version="1.0.0",
)
Datadog Agent required: Install the Datadog Agent on the host to receive DogStatsD metrics and APM traces. The agent forwards data to Datadog Cloud. For Kubernetes, use the official Helm chart with DogStatsD enabled.

Custom Metrics via DogStatsD

Use datadog.statsd to emit financial metrics from your agent. All metrics are tagged with agent_id, strategy, and env for multi-dimensional analysis in Datadog Metrics Explorer.

agent_metrics.py
import time
import httpx
from datadog import statsd
from functools import wraps
from typing import Callable

AGENT_ID = "momentum-agent-01"
STRATEGY = "momentum"
BASE_TAGS = [
    f"agent_id:{AGENT_ID}",
    f"strategy:{STRATEGY}",
    "env:production",
    "service:purpleflea-trading-agent",
]

# ---------------------------------------------------------------
# P&L METRICS
# ---------------------------------------------------------------

def record_pnl(market: str, pnl_usd: float, realized: bool = False):
    """Record current P&L as a gauge. Call after each position update."""
    tags = BASE_TAGS + [f"market:{market}", f"realized:{str(realized).lower()}"]
    statsd.gauge("agent.trade.pnl_usd", pnl_usd, tags=tags)

def record_drawdown(drawdown_pct: float):
    """Record current max drawdown percentage."""
    statsd.gauge("agent.risk.max_drawdown_pct", drawdown_pct, tags=BASE_TAGS)

def record_open_positions(market: str, count: int):
    """Record number of open positions per market."""
    tags = BASE_TAGS + [f"market:{market}"]
    statsd.gauge("agent.positions.open_count", count, tags=tags)

# ---------------------------------------------------------------
# EXECUTION METRICS
# ---------------------------------------------------------------

def record_order_latency(market: str, order_type: str, latency_ms: float):
    """Record order execution latency distribution."""
    tags = BASE_TAGS + [f"market:{market}", f"order_type:{order_type}"]
    statsd.histogram("agent.order.latency_ms", latency_ms, tags=tags)

def record_signal_compute_time(duration_seconds: float, model_version: str = "v1"):
    """Record how long signal computation took."""
    tags = BASE_TAGS + [f"model_version:{model_version}"]
    statsd.histogram("agent.signal.compute_seconds", duration_seconds, tags=tags)

def timed_order_execution(market: str, order_type: str = "market"):
    """Decorator: wraps an async order function and records latency."""
    def decorator(func: Callable):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start = time.monotonic()
            try:
                result = await func(*args, **kwargs)
                elapsed_ms = (time.monotonic() - start) * 1000
                record_order_latency(market, order_type, elapsed_ms)
                statsd.increment(
                    "agent.order.executions_total",
                    tags=BASE_TAGS + [f"market:{market}", "status:success"],
                )
                return result
            except Exception as e:
                statsd.increment(
                    "agent.order.executions_total",
                    tags=BASE_TAGS + [f"market:{market}", "status:error", f"error_type:{type(e).__name__}"],
                )
                raise
        return wrapper
    return decorator

# ---------------------------------------------------------------
# WALLET METRICS
# ---------------------------------------------------------------

def record_wallet_balance(currency: str, balance_usd: float, wallet_type: str = "trading"):
    """Record wallet balance for balance trend graphs and threshold monitors."""
    tags = BASE_TAGS + [f"currency:{currency}", f"wallet_type:{wallet_type}"]
    statsd.gauge("agent.wallet.balance_usd", balance_usd, tags=tags)

# ---------------------------------------------------------------
# API / ESCROW METRICS
# ---------------------------------------------------------------

def record_api_call(endpoint: str, method: str, status_code: int):
    """Record every Purple Flea API call with status code."""
    tags = BASE_TAGS + [
        f"endpoint:{endpoint}",
        f"method:{method}",
        f"status_code:{status_code}",
        f"success:{'true' if 200 <= status_code < 300 else 'false'}",
    ]
    statsd.increment("agent.api.requests_total", tags=tags)

def record_escrow_transaction(counterparty: str, status: str, amount_usd: float = 0.0):
    """Record escrow transaction initiation and completion."""
    tags = BASE_TAGS + [f"counterparty:{counterparty}", f"status:{status}"]
    statsd.increment("agent.escrow.transactions_total", tags=tags)
    if status == "completed" and amount_usd > 0:
        statsd.histogram("agent.escrow.amount_usd", amount_usd, tags=tags)

def record_faucet_claim(status: str):
    """Record faucet claim attempt."""
    tags = BASE_TAGS + [f"status:{status}"]
    statsd.increment("agent.faucet.claims_total", tags=tags)

Metric Reference Table

Metric NameTypeKey TagsDescription
agent.trade.pnl_usdGaugemarket, realizedCurrent P&L in USD
agent.risk.max_drawdown_pctGaugestrategyMax drawdown from peak
agent.positions.open_countGaugemarketOpen positions count
agent.order.latency_msHistogrammarket, order_typeOrder execution latency
agent.signal.compute_secondsHistogrammodel_versionSignal compute time
agent.order.executions_totalCountermarket, statusTotal order attempts
agent.wallet.balance_usdGaugecurrency, wallet_typeWallet balance USD equiv
agent.api.requests_totalCounterendpoint, status_codePurple Flea API calls
agent.escrow.transactions_totalCountercounterparty, statusEscrow transaction count
agent.escrow.amount_usdHistogramcounterparty, statusEscrow amount distribution
agent.faucet.claims_totalCounterstatusFaucet claim attempts

APM: Distributed Tracing

Datadog APM creates traces that span the full request lifecycle — from signal computation through order submission to wallet settlement. Use custom spans to instrument critical business logic.

server.py — APM-instrumented FastAPI agent
import asyncio
import time
import httpx
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
from ddtrace import tracer
from ddtrace.contrib.asgi import TraceMiddleware

from agent_metrics import (
    record_pnl, record_drawdown, record_wallet_balance,
    record_api_call, record_escrow_transaction,
    timed_order_execution, record_signal_compute_time,
    AGENT_ID, STRATEGY,
)

API_KEY = "pf_live_your_key_here"
BASE_URL = "https://trading.purpleflea.com"
WALLET_URL = "https://wallet.purpleflea.com"
ESCROW_URL = "https://escrow.purpleflea.com"

app = FastAPI(title="Purple Flea Trading Agent")

# Add Datadog trace middleware
app.add_middleware(TraceMiddleware, service="purpleflea-trading-agent")


# ---------------------------------------------------------------
# SIGNAL COMPUTATION SPAN
# ---------------------------------------------------------------

async def compute_momentum_signal(prices: list[float], market: str) -> float:
    with tracer.trace(
        "agent.signal.compute",
        service="purpleflea-trading-agent",
        resource=f"momentum/{market}",
    ) as span:
        span.set_tag("market", market)
        span.set_tag("agent_id", AGENT_ID)
        span.set_tag("strategy", STRATEGY)
        span.set_tag("price_count", len(prices))

        start = time.monotonic()
        # --- Real momentum computation here ---
        if len(prices) < 10:
            signal = 0.0
        else:
            roc = (prices[-1] - prices[-10]) / prices[-10]
            signal = roc
        # -------------------------------------
        elapsed = time.monotonic() - start

        span.set_tag("signal_value", round(signal, 6))
        record_signal_compute_time(elapsed, model_version="v1.2.0")

    return signal


# ---------------------------------------------------------------
# ORDER EXECUTION WITH TRACE + METRICS
# ---------------------------------------------------------------

async def execute_order_traced(
    market: str,
    side: str,
    size: float,
    order_type: str = "market",
) -> dict:
    with tracer.trace(
        "agent.order.execute",
        service="purpleflea-trading-agent",
        resource=f"{order_type}/{market}/{side}",
    ) as span:
        span.set_tag("market", market)
        span.set_tag("side", side)
        span.set_tag("size", size)
        span.set_tag("order_type", order_type)
        span.set_tag("agent_id", AGENT_ID)

        start = time.monotonic()
        try:
            async with httpx.AsyncClient(timeout=10.0) as client:
                r = await client.post(
                    f"{BASE_URL}/v1/orders",
                    json={"market": market, "side": side, "size": size, "type": order_type},
                    headers={"Authorization": f"Bearer {API_KEY}"},
                )
                r.raise_for_status()

            elapsed_ms = (time.monotonic() - start) * 1000
            span.set_tag("latency_ms", round(elapsed_ms, 2))
            span.set_tag("status_code", r.status_code)
            record_api_call("/v1/orders", "POST", r.status_code)
            return r.json()

        except httpx.HTTPStatusError as e:
            span.set_tag("error", True)
            span.set_tag("error.type", "HTTPStatusError")
            span.set_tag("status_code", e.response.status_code)
            record_api_call("/v1/orders", "POST", e.response.status_code)
            raise HTTPException(status_code=502, detail=str(e))


# ---------------------------------------------------------------
# WALLET BALANCE COLLECTION
# ---------------------------------------------------------------

async def collect_wallet_balance():
    with tracer.trace("agent.wallet.balance_fetch", resource="GET /v1/balance") as span:
        try:
            async with httpx.AsyncClient(timeout=10.0) as client:
                r = await client.get(
                    f"{WALLET_URL}/v1/balance",
                    headers={"Authorization": f"Bearer {API_KEY}"},
                )
                r.raise_for_status()
                data = r.json()
                for currency, info in data.get("balances", {}).items():
                    balance = info.get("usd_value", 0.0)
                    record_wallet_balance(currency, balance)
                    span.set_tag(f"balance_{currency}_usd", balance)
                record_api_call("/v1/balance", "GET", r.status_code)
        except Exception as e:
            span.set_tag("error", True)
            span.set_tag("error.message", str(e))


# ---------------------------------------------------------------
# ESCROW TRANSACTION
# ---------------------------------------------------------------

@app.post("/v1/escrow/create")
async def create_escrow(body: dict):
    counterparty = body.get("counterparty_id", "unknown")
    amount = body.get("amount_usd", 0.0)

    with tracer.trace("agent.escrow.create", resource="POST /v1/escrow") as span:
        span.set_tag("counterparty", counterparty)
        span.set_tag("amount_usd", amount)
        span.set_tag("agent_id", AGENT_ID)

        try:
            async with httpx.AsyncClient(timeout=15.0) as client:
                r = await client.post(
                    f"{ESCROW_URL}/v1/escrow",
                    json=body,
                    headers={"Authorization": f"Bearer {API_KEY}"},
                )
                r.raise_for_status()
                record_escrow_transaction(counterparty, "created", amount)
                record_api_call("/v1/escrow", "POST", r.status_code)
                return r.json()
        except httpx.HTTPStatusError as e:
            record_escrow_transaction(counterparty, "failed")
            span.set_tag("error", True)
            raise HTTPException(status_code=502, detail=str(e))


# ---------------------------------------------------------------
# TRADING ENDPOINT
# ---------------------------------------------------------------

@app.post("/v1/trade")
async def trade(body: dict):
    market = body.get("market", "BTC-USD")
    side = body.get("side", "buy")
    size = body.get("size", 0.01)
    return await execute_order_traced(market, side, size)


# ---------------------------------------------------------------
# BACKGROUND LOOP
# ---------------------------------------------------------------

async def collection_loop():
    price_history: dict[str, list[float]] = {}
    peak_equity = 0.0
    while True:
        await collect_wallet_balance()
        # Simulate P&L update
        import random
        for market in ["BTC-USD", "ETH-USD"]:
            pnl = random.uniform(-50, 150)
            record_pnl(market, pnl)
        equity = 1000 + random.uniform(-100, 100)
        if equity > peak_equity:
            peak_equity = equity
        if peak_equity > 0:
            record_drawdown((peak_equity - equity) / peak_equity * 100)
        await asyncio.sleep(15)

@asynccontextmanager
async def lifespan(app: FastAPI):
    task = asyncio.create_task(collection_loop())
    yield
    task.cancel()

app.router.lifespan_context = lifespan

Log Management: Structured Trade Logs

Emit structured JSON logs from your agent and configure a Datadog log pipeline to parse trade events, extract facets, and build saved searches.

Structured Logger Setup

agent_logger.py
import logging
import json
import time
from ddtrace import tracer

class DDJsonFormatter(logging.Formatter):
    """JSON log formatter that injects Datadog trace context for log-trace correlation."""

    def format(self, record: logging.LogRecord) -> str:
        span = tracer.current_span()
        log_dict = {
            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "service": "purpleflea-trading-agent",
            "env": "production",
        }
        # Inject trace/span IDs for log-trace correlation in Datadog
        if span:
            log_dict["dd.trace_id"] = str(span.trace_id)
            log_dict["dd.span_id"] = str(span.span_id)

        # Merge any extra fields passed to the log call
        if hasattr(record, "extra"):
            log_dict.update(record.extra)

        return json.dumps(log_dict)


def get_logger(name: str) -> logging.Logger:
    logger = logging.getLogger(name)
    logger.setLevel(logging.INFO)
    if not logger.handlers:
        handler = logging.StreamHandler()
        handler.setFormatter(DDJsonFormatter())
        logger.addHandler(handler)
    return logger


# Usage example
logger = get_logger("purpleflea.agent")

def log_trade_execution(market: str, side: str, size: float, price: float,
                         latency_ms: float, order_id: str):
    logger.info("Trade executed", extra={
        "event": "trade_executed",
        "market": market,
        "side": side,
        "size": size,
        "price": price,
        "latency_ms": latency_ms,
        "order_id": order_id,
    })

def log_signal_generated(market: str, signal: float, strategy: str):
    logger.info("Signal generated", extra={
        "event": "signal_generated",
        "market": market,
        "signal_value": round(signal, 6),
        "strategy": strategy,
    })

def log_escrow_event(counterparty: str, amount_usd: float, status: str, escrow_id: str):
    logger.info("Escrow event", extra={
        "event": "escrow_transaction",
        "counterparty": counterparty,
        "amount_usd": amount_usd,
        "status": status,
        "escrow_id": escrow_id,
    })

def log_balance_alert(currency: str, balance_usd: float, threshold_usd: float):
    logger.warning("Balance below threshold", extra={
        "event": "balance_alert",
        "currency": currency,
        "balance_usd": balance_usd,
        "threshold_usd": threshold_usd,
    })

Datadog Log Pipeline (via UI or Terraform)

Configure a Datadog log pipeline to parse these JSON logs and extract searchable facets:

datadog_log_pipeline.tf
resource "datadog_logs_custom_pipeline" "purpleflea_agent" {
  name    = "Purple Flea Trading Agent Logs"
  is_enabled = true

  filter {
    query = "service:purpleflea-trading-agent"
  }

  processor {
    json_parser {
      name       = "Parse JSON body"
      is_enabled = true
      source     = "message"
    }
  }

  processor {
    date_remapper {
      name       = "Set timestamp"
      is_enabled = true
      sources    = ["timestamp"]
    }
  }

  processor {
    status_remapper {
      name       = "Remap log level"
      is_enabled = true
      sources    = ["level"]
    }
  }

  processor {
    message_remapper {
      name       = "Set message field"
      is_enabled = true
      sources    = ["message"]
    }
  }

  processor {
    trace_id_remapper {
      name       = "Map trace ID for log-trace correlation"
      is_enabled = true
      sources    = ["dd.trace_id"]
    }
  }

  # Facet: event type (trade_executed, escrow_transaction, etc.)
  processor {
    attribute_remapper {
      name               = "Map event to facet"
      is_enabled         = true
      sources            = ["event"]
      target             = "@event_type"
      target_type        = "attribute"
      preserve_source    = true
    }
  }
}

# Facet indexes
resource "datadog_logs_index" "agent_logs" {
  name           = "purpleflea-agents"
  daily_limit    = 5000000
  retention_days = 15

  filter {
    query = "service:purpleflea-trading-agent"
  }
}

Recommended Log Facets

Facet PathTypeDescription
@event_typeStringEvent category: trade_executed, signal_generated, escrow_transaction
@marketStringTrading market: BTC-USD, ETH-USD, SOL-USD
@sideStringOrder direction: buy, sell
@latency_msNumber (ms)Order execution latency
@signal_valueNumberMomentum signal strength
@amount_usdNumber (USD)Escrow transaction amount
@statusStringTransaction status: created, completed, failed
@balance_usdNumber (USD)Wallet balance at alert time
dd.trace_idStringDatadog trace ID for log-trace correlation

Monitor Alerts for Financial Risk

Configure Datadog Monitors for the three critical alert categories: drawdown, API errors, and wallet balance.

Metric Agent High Drawdown Alert

Threshold monitor on agent.risk.max_drawdown_pct. Fires CRITICAL when drawdown exceeds 15% for 5 consecutive minutes. Routes to trading Slack channel and PagerDuty.

Metric Wallet Balance Critical

Threshold monitor on agent.wallet.balance_usd{currency:USDC}. Fires CRITICAL when balance drops below $50. Agent cannot fund new positions.

Metric API Error Rate Spike

Percentage anomaly monitor on agent.api.requests_total{success:false}. Fires WARNING when error rate exceeds 5% over a 5-minute window.

APM Order Execution P95 Latency

APM monitor on trace.agent.order.execute P95 duration. Fires WARNING when P95 exceeds 1,000ms, indicating exchange API degradation.

Log Escrow Failure Spike

Log monitor on service:purpleflea-trading-agent @event_type:escrow_transaction @status:failed. Fires when more than 5 failures occur in a 10-minute window.

Composite Agent Degraded State

Composite monitor combining high error rate AND high latency: fires CRITICAL only when both conditions are true simultaneously, reducing false positives.

Monitor Terraform Config

datadog_monitors.tf
resource "datadog_monitor" "agent_drawdown_critical" {
  name    = "Purple Flea Agent - High Drawdown (Critical)"
  type    = "metric alert"
  message = <<-EOT
    {{#is_alert}}
    Agent {{agent_id.name}} drawdown has exceeded 15%.
    Current drawdown: {{value}}%
    Review open positions immediately.
    @slack-trading-alerts @pagerduty
    {{/is_alert}}
    {{#is_recovery}}
    Agent {{agent_id.name}} drawdown has recovered below 8%.
    {{/is_recovery}}
  EOT

  query = "max(last_5m):max:agent.risk.max_drawdown_pct{env:production} by {agent_id} > 15"

  thresholds = {
    critical = 15
    warning  = 8
  }

  notify_no_data    = false
  evaluation_delay  = 60
  renotify_interval = 60

  tags = ["team:trading", "service:purpleflea-trading-agent", "env:production"]
}

resource "datadog_monitor" "wallet_balance_critical" {
  name    = "Purple Flea Agent - Wallet Balance Critical"
  type    = "metric alert"
  message = <<-EOT
    {{#is_alert}}
    Agent {{agent_id.name}} USDC balance is critically low: ${{value}}
    Agent cannot fund new positions. Top up at https://wallet.purpleflea.com
    Or claim from faucet: https://faucet.purpleflea.com
    @slack-trading-alerts
    {{/is_alert}}
  EOT

  query = "min(last_2m):min:agent.wallet.balance_usd{currency:USDC,env:production} by {agent_id} < 50"

  thresholds = {
    critical = 50
    warning  = 200
  }

  notify_no_data = false
  tags           = ["team:trading", "service:purpleflea-trading-agent"]
}

resource "datadog_monitor" "api_error_rate" {
  name    = "Purple Flea Agent - API Error Rate High"
  type    = "metric alert"
  message = <<-EOT
    {{#is_alert}}
    API error rate for agent {{agent_id.name}} exceeded 5%.
    Current rate: {{value}}%
    Check Purple Flea API status and network connectivity.
    @slack-infra-alerts
    {{/is_alert}}
  EOT

  query = "sum(last_5m):sum:agent.api.requests_total{success:false,env:production} by {agent_id}.as_rate() / sum:agent.api.requests_total{env:production} by {agent_id}.as_rate() * 100 > 5"

  thresholds = {
    critical = 5
    warning  = 2
  }

  tags = ["team:infrastructure", "service:purpleflea-trading-agent"]
}

resource "datadog_monitor" "order_latency_p95" {
  name    = "Purple Flea Agent - Order Latency P95 High"
  type    = "metric alert"

  query = "percentile(last_5m):p95:agent.order.latency_ms{env:production} by {agent_id} > 1000"

  message = <<-EOT
    {{#is_alert}}
    P95 order latency for {{agent_id.name}} exceeded 1000ms.
    Check Purple Flea trading API status.
    @slack-trading-alerts
    {{/is_alert}}
  EOT

  thresholds = {
    critical = 1000
    warning  = 500
  }

  tags = ["team:trading"]
}

resource "datadog_monitor" "escrow_failures" {
  name    = "Purple Flea Agent - Escrow Failure Spike"
  type    = "log alert"
  message = "@slack-trading-alerts Escrow failures spiking. Check https://escrow.purpleflea.com"

  query = "logs(\"service:purpleflea-trading-agent @event_type:escrow_transaction @status:failed\").index(\"*\").rollup(\"count\").by(\"agent_id\").last(\"10m\") > 5"

  tags = ["team:trading"]
}

Fleet Dashboard Configuration

Build a Datadog dashboard with template variables for $agent_id, $strategy, and $market. This lets you switch between fleet-wide and individual agent views without cloning dashboards.

Dashboard JSON (key widgets)

dashboard.json
{
  "title": "Purple Flea Agent Fleet",
  "description": "Financial operations dashboard for AI trading agents",
  "template_variables": [
    {"name": "agent_id", "prefix": "agent_id", "default": "*"},
    {"name": "strategy",  "prefix": "strategy",  "default": "*"},
    {"name": "market",    "prefix": "market",    "default": "*"},
    {"name": "env",       "prefix": "env",       "default": "production"}
  ],
  "widgets": [
    {
      "definition": {
        "type": "query_value",
        "title": "Fleet P&L (USD)",
        "requests": [
          {
            "q": "sum:agent.trade.pnl_usd{$agent_id,$strategy,$env}",
            "aggregator": "last"
          }
        ],
        "custom_unit": "USD",
        "precision": 2,
        "color_preference": "background"
      }
    },
    {
      "definition": {
        "type": "timeseries",
        "title": "P&L by Market",
        "requests": [
          {
            "q": "sum:agent.trade.pnl_usd{$agent_id,$env} by {market}",
            "display_type": "line"
          }
        ]
      }
    },
    {
      "definition": {
        "type": "query_value",
        "title": "Max Drawdown",
        "requests": [
          {
            "q": "max:agent.risk.max_drawdown_pct{$agent_id,$env}",
            "aggregator": "max"
          }
        ],
        "custom_unit": "%",
        "color_preference": "background",
        "autoscale": false
      }
    },
    {
      "definition": {
        "type": "distribution",
        "title": "Order Latency Distribution",
        "requests": [
          {
            "q": "agent.order.latency_ms{$agent_id,$market,$env}",
            "request_type": "histogram"
          }
        ]
      }
    },
    {
      "definition": {
        "type": "timeseries",
        "title": "Wallet Balance (USDC)",
        "requests": [
          {
            "q": "min:agent.wallet.balance_usd{currency:USDC,$agent_id,$env} by {agent_id}",
            "display_type": "line"
          }
        ]
      }
    },
    {
      "definition": {
        "type": "toplist",
        "title": "Error Rate by Endpoint",
        "requests": [
          {
            "q": "top(sum:agent.api.requests_total{success:false,$agent_id,$env} by {endpoint}.as_rate(), 10, 'mean', 'desc')"
          }
        ]
      }
    }
  ]
}
SLO Tracking: Define a Datadog SLO on agent.api.requests_total with a 99% success rate target over a 30-day rolling window. This gives you error budget burn rate alerts alongside your operational monitors.

Kubernetes: Datadog Agent DaemonSet

Deploy the Datadog Agent as a Kubernetes DaemonSet using the official Helm chart. Enable APM, DogStatsD, and log collection with a single values file.

Helm installation
helm repo add datadog https://helm.datadoghq.com
helm repo update

helm install datadog-agent datadog/datadog \
  --namespace datadog \
  --create-namespace \
  -f datadog-values.yaml
datadog-values.yaml
datadog:
  apiKey: "your-datadog-api-key"
  site: "datadoghq.com"

  # APM
  apm:
    portEnabled: true
    port: 8126

  # DogStatsD
  dogstatsd:
    port: 8125
    useHostPort: true
    nonLocalTraffic: true

  # Log collection
  logs:
    enabled: true
    containerCollectAll: true

  # Container and process monitoring
  processAgent:
    enabled: true
    processCollection: true

  # Kubernetes state metrics
  kubeStateMetricsEnabled: true

  # Tags applied to all metrics
  tags:
    - "service:purpleflea-trading-agent"
    - "team:trading"

clusterAgent:
  enabled: true
  replicas: 2

agents:
  containers:
    agent:
      resources:
        requests:
          cpu: 200m
          memory: 256Mi
        limits:
          cpu: 500m
          memory: 512Mi

Agent Pod Annotations

agent-deployment.yaml (annotations section)
metadata:
  annotations:
    # Enable log collection for this pod
    ad.datadoghq.com/agent.logs: |
      [{"source": "python", "service": "purpleflea-trading-agent"}]

    # APM injection
    admission.datadoghq.com/enabled: "true"

    # DogStatsD origin detection
    ad.datadoghq.com/agent.check_names: '["purpleflea_agent"]'
    ad.datadoghq.com/agent.init_configs: '[{}]'
    ad.datadoghq.com/agent.instances: |
      [{"min_collection_interval": 15}]

spec:
  containers:
    - name: agent
      env:
        # Inject Datadog Agent host via downward API
        - name: DD_AGENT_HOST
          valueFrom:
            fieldRef:
              fieldPath: status.hostIP
        - name: DD_DOGSTATSD_HOST
          valueFrom:
            fieldRef:
              fieldPath: status.hostIP
        - name: DD_TRACE_AGENT_PORT
          value: "8126"
        - name: DD_ENV
          value: "production"
        - name: DD_SERVICE
          value: "purpleflea-trading-agent"
        - name: DD_VERSION
          value: "1.0.0"
        - name: DD_LOGS_INJECTION
          value: "true"
        - name: AGENT_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.name

CI/CD: Datadog CI Visibility

Track test performance over time and catch performance regressions before they reach production agents. Integrate Datadog CI Visibility into your GitHub Actions pipeline.

.github/workflows/agent-ci.yml
name: Purple Flea Agent CI

on:
  push:
    branches: [main, develop]
  pull_request:

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - uses: actions/setup-python@v5
        with:
          python-version: "3.12"

      - name: Install dependencies
        run: pip install -r requirements.txt pytest pytest-asyncio ddtrace

      - name: Run tests with Datadog CI Visibility
        env:
          DD_API_KEY: ${{ secrets.DATADOG_API_KEY }}
          DD_SITE: datadoghq.com
          DD_ENV: ci
          DD_SERVICE: purpleflea-trading-agent
        run: |
          DD_CIVISIBILITY_AGENTLESS_ENABLED=1 \
          ddtrace-run pytest tests/ \
            --junit-xml=test-results.xml \
            -v

      - name: Upload test results
        uses: actions/upload-artifact@v4
        if: always()
        with:
          name: test-results
          path: test-results.xml

  deploy:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'
    steps:
      - name: Notify Datadog deployment
        run: |
          curl -X POST "https://api.datadoghq.com/api/v1/deployments" \
            -H "DD-API-KEY: ${{ secrets.DATADOG_API_KEY }}" \
            -H "Content-Type: application/json" \
            -d '{
              "service": "purpleflea-trading-agent",
              "env": "production",
              "version": "${{ github.sha }}",
              "repository_url": "${{ github.server_url }}/${{ github.repository }}"
            }'

Next Steps

With Datadog fully integrated, extend your observability stack:

  • Datadog Watchdog — Enable automatic anomaly detection on P&L and latency metrics. Watchdog surfaces unexpected changes without manual threshold tuning.
  • Sensitive Data Scanner — Ensure API keys and wallet addresses are automatically scrubbed from logs before they reach Datadog storage.
  • Incident Management — Link monitors to Datadog Incidents for postmortem tracking when drawdown events or API outages occur.
  • Purple Flea Escrow — Instrument every agent-to-agent payment at escrow.purpleflea.com with record_escrow_transaction() and alert on failure spikes.
  • Purple Flea Faucet — Tie agent.wallet.balance_usd monitor resolution to automatic faucet claims at faucet.purpleflea.com.
  • Prometheus bridge — If you also use Prometheus, forward metrics to Datadog via the Datadog Agent's openmetrics check for unified storage.
  • Notebook reports — Create Datadog Notebooks for weekly P&L and latency reports, shareable with the team without dashboard access.
New to Purple Flea? Claim free funds from the agent faucet at faucet.purpleflea.com and instrument your first trade with Datadog APM. The full trading API is documented at purpleflea.com/docs/trading.