Quickstart: Install and Configure
Install the Datadog Python libraries and configure your API key. Purple Flea agents typically run as FastAPI services — ddtrace auto-patches FastAPI, HTTPX, and asyncio out of the box.
pip install ddtrace datadog fastapi uvicorn httpx
# Start agent server with ddtrace auto-instrumentation
DD_API_KEY=your_datadog_api_key \
DD_SITE=datadoghq.com \
DD_ENV=production \
DD_SERVICE=purpleflea-trading-agent \
DD_VERSION=1.0.0 \
DD_AGENT_HOST=localhost \
DD_DOGSTATSD_PORT=8125 \
ddtrace-run uvicorn server:app --host 0.0.0.0 --port 8000
Or configure programmatically at the top of your entry point, before any other imports:
from ddtrace import tracer, patch_all
from datadog import initialize, statsd
# Initialize Datadog (call before other imports)
initialize(
api_key="your_datadog_api_key",
host_name="agent-host-01",
statsd_host="localhost",
statsd_port=8125,
)
# Patch all supported integrations automatically
patch_all()
# Configure tracer
tracer.configure(
hostname="localhost",
port=8126,
analytics_enabled=True,
env="production",
service="purpleflea-trading-agent",
version="1.0.0",
)
Custom Metrics via DogStatsD
Use datadog.statsd to emit financial metrics from your agent. All metrics are tagged with agent_id, strategy, and env for multi-dimensional analysis in Datadog Metrics Explorer.
import time
import httpx
from datadog import statsd
from functools import wraps
from typing import Callable
AGENT_ID = "momentum-agent-01"
STRATEGY = "momentum"
BASE_TAGS = [
f"agent_id:{AGENT_ID}",
f"strategy:{STRATEGY}",
"env:production",
"service:purpleflea-trading-agent",
]
# ---------------------------------------------------------------
# P&L METRICS
# ---------------------------------------------------------------
def record_pnl(market: str, pnl_usd: float, realized: bool = False):
"""Record current P&L as a gauge. Call after each position update."""
tags = BASE_TAGS + [f"market:{market}", f"realized:{str(realized).lower()}"]
statsd.gauge("agent.trade.pnl_usd", pnl_usd, tags=tags)
def record_drawdown(drawdown_pct: float):
"""Record current max drawdown percentage."""
statsd.gauge("agent.risk.max_drawdown_pct", drawdown_pct, tags=BASE_TAGS)
def record_open_positions(market: str, count: int):
"""Record number of open positions per market."""
tags = BASE_TAGS + [f"market:{market}"]
statsd.gauge("agent.positions.open_count", count, tags=tags)
# ---------------------------------------------------------------
# EXECUTION METRICS
# ---------------------------------------------------------------
def record_order_latency(market: str, order_type: str, latency_ms: float):
"""Record order execution latency distribution."""
tags = BASE_TAGS + [f"market:{market}", f"order_type:{order_type}"]
statsd.histogram("agent.order.latency_ms", latency_ms, tags=tags)
def record_signal_compute_time(duration_seconds: float, model_version: str = "v1"):
"""Record how long signal computation took."""
tags = BASE_TAGS + [f"model_version:{model_version}"]
statsd.histogram("agent.signal.compute_seconds", duration_seconds, tags=tags)
def timed_order_execution(market: str, order_type: str = "market"):
"""Decorator: wraps an async order function and records latency."""
def decorator(func: Callable):
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.monotonic()
try:
result = await func(*args, **kwargs)
elapsed_ms = (time.monotonic() - start) * 1000
record_order_latency(market, order_type, elapsed_ms)
statsd.increment(
"agent.order.executions_total",
tags=BASE_TAGS + [f"market:{market}", "status:success"],
)
return result
except Exception as e:
statsd.increment(
"agent.order.executions_total",
tags=BASE_TAGS + [f"market:{market}", "status:error", f"error_type:{type(e).__name__}"],
)
raise
return wrapper
return decorator
# ---------------------------------------------------------------
# WALLET METRICS
# ---------------------------------------------------------------
def record_wallet_balance(currency: str, balance_usd: float, wallet_type: str = "trading"):
"""Record wallet balance for balance trend graphs and threshold monitors."""
tags = BASE_TAGS + [f"currency:{currency}", f"wallet_type:{wallet_type}"]
statsd.gauge("agent.wallet.balance_usd", balance_usd, tags=tags)
# ---------------------------------------------------------------
# API / ESCROW METRICS
# ---------------------------------------------------------------
def record_api_call(endpoint: str, method: str, status_code: int):
"""Record every Purple Flea API call with status code."""
tags = BASE_TAGS + [
f"endpoint:{endpoint}",
f"method:{method}",
f"status_code:{status_code}",
f"success:{'true' if 200 <= status_code < 300 else 'false'}",
]
statsd.increment("agent.api.requests_total", tags=tags)
def record_escrow_transaction(counterparty: str, status: str, amount_usd: float = 0.0):
"""Record escrow transaction initiation and completion."""
tags = BASE_TAGS + [f"counterparty:{counterparty}", f"status:{status}"]
statsd.increment("agent.escrow.transactions_total", tags=tags)
if status == "completed" and amount_usd > 0:
statsd.histogram("agent.escrow.amount_usd", amount_usd, tags=tags)
def record_faucet_claim(status: str):
"""Record faucet claim attempt."""
tags = BASE_TAGS + [f"status:{status}"]
statsd.increment("agent.faucet.claims_total", tags=tags)
Metric Reference Table
| Metric Name | Type | Key Tags | Description |
|---|---|---|---|
| agent.trade.pnl_usd | Gauge | market, realized | Current P&L in USD |
| agent.risk.max_drawdown_pct | Gauge | strategy | Max drawdown from peak |
| agent.positions.open_count | Gauge | market | Open positions count |
| agent.order.latency_ms | Histogram | market, order_type | Order execution latency |
| agent.signal.compute_seconds | Histogram | model_version | Signal compute time |
| agent.order.executions_total | Counter | market, status | Total order attempts |
| agent.wallet.balance_usd | Gauge | currency, wallet_type | Wallet balance USD equiv |
| agent.api.requests_total | Counter | endpoint, status_code | Purple Flea API calls |
| agent.escrow.transactions_total | Counter | counterparty, status | Escrow transaction count |
| agent.escrow.amount_usd | Histogram | counterparty, status | Escrow amount distribution |
| agent.faucet.claims_total | Counter | status | Faucet claim attempts |
APM: Distributed Tracing
Datadog APM creates traces that span the full request lifecycle — from signal computation through order submission to wallet settlement. Use custom spans to instrument critical business logic.
import asyncio
import time
import httpx
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
from ddtrace import tracer
from ddtrace.contrib.asgi import TraceMiddleware
from agent_metrics import (
record_pnl, record_drawdown, record_wallet_balance,
record_api_call, record_escrow_transaction,
timed_order_execution, record_signal_compute_time,
AGENT_ID, STRATEGY,
)
API_KEY = "pf_live_your_key_here"
BASE_URL = "https://trading.purpleflea.com"
WALLET_URL = "https://wallet.purpleflea.com"
ESCROW_URL = "https://escrow.purpleflea.com"
app = FastAPI(title="Purple Flea Trading Agent")
# Add Datadog trace middleware
app.add_middleware(TraceMiddleware, service="purpleflea-trading-agent")
# ---------------------------------------------------------------
# SIGNAL COMPUTATION SPAN
# ---------------------------------------------------------------
async def compute_momentum_signal(prices: list[float], market: str) -> float:
with tracer.trace(
"agent.signal.compute",
service="purpleflea-trading-agent",
resource=f"momentum/{market}",
) as span:
span.set_tag("market", market)
span.set_tag("agent_id", AGENT_ID)
span.set_tag("strategy", STRATEGY)
span.set_tag("price_count", len(prices))
start = time.monotonic()
# --- Real momentum computation here ---
if len(prices) < 10:
signal = 0.0
else:
roc = (prices[-1] - prices[-10]) / prices[-10]
signal = roc
# -------------------------------------
elapsed = time.monotonic() - start
span.set_tag("signal_value", round(signal, 6))
record_signal_compute_time(elapsed, model_version="v1.2.0")
return signal
# ---------------------------------------------------------------
# ORDER EXECUTION WITH TRACE + METRICS
# ---------------------------------------------------------------
async def execute_order_traced(
market: str,
side: str,
size: float,
order_type: str = "market",
) -> dict:
with tracer.trace(
"agent.order.execute",
service="purpleflea-trading-agent",
resource=f"{order_type}/{market}/{side}",
) as span:
span.set_tag("market", market)
span.set_tag("side", side)
span.set_tag("size", size)
span.set_tag("order_type", order_type)
span.set_tag("agent_id", AGENT_ID)
start = time.monotonic()
try:
async with httpx.AsyncClient(timeout=10.0) as client:
r = await client.post(
f"{BASE_URL}/v1/orders",
json={"market": market, "side": side, "size": size, "type": order_type},
headers={"Authorization": f"Bearer {API_KEY}"},
)
r.raise_for_status()
elapsed_ms = (time.monotonic() - start) * 1000
span.set_tag("latency_ms", round(elapsed_ms, 2))
span.set_tag("status_code", r.status_code)
record_api_call("/v1/orders", "POST", r.status_code)
return r.json()
except httpx.HTTPStatusError as e:
span.set_tag("error", True)
span.set_tag("error.type", "HTTPStatusError")
span.set_tag("status_code", e.response.status_code)
record_api_call("/v1/orders", "POST", e.response.status_code)
raise HTTPException(status_code=502, detail=str(e))
# ---------------------------------------------------------------
# WALLET BALANCE COLLECTION
# ---------------------------------------------------------------
async def collect_wallet_balance():
with tracer.trace("agent.wallet.balance_fetch", resource="GET /v1/balance") as span:
try:
async with httpx.AsyncClient(timeout=10.0) as client:
r = await client.get(
f"{WALLET_URL}/v1/balance",
headers={"Authorization": f"Bearer {API_KEY}"},
)
r.raise_for_status()
data = r.json()
for currency, info in data.get("balances", {}).items():
balance = info.get("usd_value", 0.0)
record_wallet_balance(currency, balance)
span.set_tag(f"balance_{currency}_usd", balance)
record_api_call("/v1/balance", "GET", r.status_code)
except Exception as e:
span.set_tag("error", True)
span.set_tag("error.message", str(e))
# ---------------------------------------------------------------
# ESCROW TRANSACTION
# ---------------------------------------------------------------
@app.post("/v1/escrow/create")
async def create_escrow(body: dict):
counterparty = body.get("counterparty_id", "unknown")
amount = body.get("amount_usd", 0.0)
with tracer.trace("agent.escrow.create", resource="POST /v1/escrow") as span:
span.set_tag("counterparty", counterparty)
span.set_tag("amount_usd", amount)
span.set_tag("agent_id", AGENT_ID)
try:
async with httpx.AsyncClient(timeout=15.0) as client:
r = await client.post(
f"{ESCROW_URL}/v1/escrow",
json=body,
headers={"Authorization": f"Bearer {API_KEY}"},
)
r.raise_for_status()
record_escrow_transaction(counterparty, "created", amount)
record_api_call("/v1/escrow", "POST", r.status_code)
return r.json()
except httpx.HTTPStatusError as e:
record_escrow_transaction(counterparty, "failed")
span.set_tag("error", True)
raise HTTPException(status_code=502, detail=str(e))
# ---------------------------------------------------------------
# TRADING ENDPOINT
# ---------------------------------------------------------------
@app.post("/v1/trade")
async def trade(body: dict):
market = body.get("market", "BTC-USD")
side = body.get("side", "buy")
size = body.get("size", 0.01)
return await execute_order_traced(market, side, size)
# ---------------------------------------------------------------
# BACKGROUND LOOP
# ---------------------------------------------------------------
async def collection_loop():
price_history: dict[str, list[float]] = {}
peak_equity = 0.0
while True:
await collect_wallet_balance()
# Simulate P&L update
import random
for market in ["BTC-USD", "ETH-USD"]:
pnl = random.uniform(-50, 150)
record_pnl(market, pnl)
equity = 1000 + random.uniform(-100, 100)
if equity > peak_equity:
peak_equity = equity
if peak_equity > 0:
record_drawdown((peak_equity - equity) / peak_equity * 100)
await asyncio.sleep(15)
@asynccontextmanager
async def lifespan(app: FastAPI):
task = asyncio.create_task(collection_loop())
yield
task.cancel()
app.router.lifespan_context = lifespan
Log Management: Structured Trade Logs
Emit structured JSON logs from your agent and configure a Datadog log pipeline to parse trade events, extract facets, and build saved searches.
Structured Logger Setup
import logging
import json
import time
from ddtrace import tracer
class DDJsonFormatter(logging.Formatter):
"""JSON log formatter that injects Datadog trace context for log-trace correlation."""
def format(self, record: logging.LogRecord) -> str:
span = tracer.current_span()
log_dict = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"service": "purpleflea-trading-agent",
"env": "production",
}
# Inject trace/span IDs for log-trace correlation in Datadog
if span:
log_dict["dd.trace_id"] = str(span.trace_id)
log_dict["dd.span_id"] = str(span.span_id)
# Merge any extra fields passed to the log call
if hasattr(record, "extra"):
log_dict.update(record.extra)
return json.dumps(log_dict)
def get_logger(name: str) -> logging.Logger:
logger = logging.getLogger(name)
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(DDJsonFormatter())
logger.addHandler(handler)
return logger
# Usage example
logger = get_logger("purpleflea.agent")
def log_trade_execution(market: str, side: str, size: float, price: float,
latency_ms: float, order_id: str):
logger.info("Trade executed", extra={
"event": "trade_executed",
"market": market,
"side": side,
"size": size,
"price": price,
"latency_ms": latency_ms,
"order_id": order_id,
})
def log_signal_generated(market: str, signal: float, strategy: str):
logger.info("Signal generated", extra={
"event": "signal_generated",
"market": market,
"signal_value": round(signal, 6),
"strategy": strategy,
})
def log_escrow_event(counterparty: str, amount_usd: float, status: str, escrow_id: str):
logger.info("Escrow event", extra={
"event": "escrow_transaction",
"counterparty": counterparty,
"amount_usd": amount_usd,
"status": status,
"escrow_id": escrow_id,
})
def log_balance_alert(currency: str, balance_usd: float, threshold_usd: float):
logger.warning("Balance below threshold", extra={
"event": "balance_alert",
"currency": currency,
"balance_usd": balance_usd,
"threshold_usd": threshold_usd,
})
Datadog Log Pipeline (via UI or Terraform)
Configure a Datadog log pipeline to parse these JSON logs and extract searchable facets:
resource "datadog_logs_custom_pipeline" "purpleflea_agent" {
name = "Purple Flea Trading Agent Logs"
is_enabled = true
filter {
query = "service:purpleflea-trading-agent"
}
processor {
json_parser {
name = "Parse JSON body"
is_enabled = true
source = "message"
}
}
processor {
date_remapper {
name = "Set timestamp"
is_enabled = true
sources = ["timestamp"]
}
}
processor {
status_remapper {
name = "Remap log level"
is_enabled = true
sources = ["level"]
}
}
processor {
message_remapper {
name = "Set message field"
is_enabled = true
sources = ["message"]
}
}
processor {
trace_id_remapper {
name = "Map trace ID for log-trace correlation"
is_enabled = true
sources = ["dd.trace_id"]
}
}
# Facet: event type (trade_executed, escrow_transaction, etc.)
processor {
attribute_remapper {
name = "Map event to facet"
is_enabled = true
sources = ["event"]
target = "@event_type"
target_type = "attribute"
preserve_source = true
}
}
}
# Facet indexes
resource "datadog_logs_index" "agent_logs" {
name = "purpleflea-agents"
daily_limit = 5000000
retention_days = 15
filter {
query = "service:purpleflea-trading-agent"
}
}
Recommended Log Facets
| Facet Path | Type | Description |
|---|---|---|
| @event_type | String | Event category: trade_executed, signal_generated, escrow_transaction |
| @market | String | Trading market: BTC-USD, ETH-USD, SOL-USD |
| @side | String | Order direction: buy, sell |
| @latency_ms | Number (ms) | Order execution latency |
| @signal_value | Number | Momentum signal strength |
| @amount_usd | Number (USD) | Escrow transaction amount |
| @status | String | Transaction status: created, completed, failed |
| @balance_usd | Number (USD) | Wallet balance at alert time |
| dd.trace_id | String | Datadog trace ID for log-trace correlation |
Monitor Alerts for Financial Risk
Configure Datadog Monitors for the three critical alert categories: drawdown, API errors, and wallet balance.
Threshold monitor on agent.risk.max_drawdown_pct. Fires CRITICAL when drawdown exceeds 15% for 5 consecutive minutes. Routes to trading Slack channel and PagerDuty.
Threshold monitor on agent.wallet.balance_usd{currency:USDC}. Fires CRITICAL when balance drops below $50. Agent cannot fund new positions.
Percentage anomaly monitor on agent.api.requests_total{success:false}. Fires WARNING when error rate exceeds 5% over a 5-minute window.
APM monitor on trace.agent.order.execute P95 duration. Fires WARNING when P95 exceeds 1,000ms, indicating exchange API degradation.
Log monitor on service:purpleflea-trading-agent @event_type:escrow_transaction @status:failed. Fires when more than 5 failures occur in a 10-minute window.
Composite monitor combining high error rate AND high latency: fires CRITICAL only when both conditions are true simultaneously, reducing false positives.
Monitor Terraform Config
resource "datadog_monitor" "agent_drawdown_critical" {
name = "Purple Flea Agent - High Drawdown (Critical)"
type = "metric alert"
message = <<-EOT
{{#is_alert}}
Agent {{agent_id.name}} drawdown has exceeded 15%.
Current drawdown: {{value}}%
Review open positions immediately.
@slack-trading-alerts @pagerduty
{{/is_alert}}
{{#is_recovery}}
Agent {{agent_id.name}} drawdown has recovered below 8%.
{{/is_recovery}}
EOT
query = "max(last_5m):max:agent.risk.max_drawdown_pct{env:production} by {agent_id} > 15"
thresholds = {
critical = 15
warning = 8
}
notify_no_data = false
evaluation_delay = 60
renotify_interval = 60
tags = ["team:trading", "service:purpleflea-trading-agent", "env:production"]
}
resource "datadog_monitor" "wallet_balance_critical" {
name = "Purple Flea Agent - Wallet Balance Critical"
type = "metric alert"
message = <<-EOT
{{#is_alert}}
Agent {{agent_id.name}} USDC balance is critically low: ${{value}}
Agent cannot fund new positions. Top up at https://wallet.purpleflea.com
Or claim from faucet: https://faucet.purpleflea.com
@slack-trading-alerts
{{/is_alert}}
EOT
query = "min(last_2m):min:agent.wallet.balance_usd{currency:USDC,env:production} by {agent_id} < 50"
thresholds = {
critical = 50
warning = 200
}
notify_no_data = false
tags = ["team:trading", "service:purpleflea-trading-agent"]
}
resource "datadog_monitor" "api_error_rate" {
name = "Purple Flea Agent - API Error Rate High"
type = "metric alert"
message = <<-EOT
{{#is_alert}}
API error rate for agent {{agent_id.name}} exceeded 5%.
Current rate: {{value}}%
Check Purple Flea API status and network connectivity.
@slack-infra-alerts
{{/is_alert}}
EOT
query = "sum(last_5m):sum:agent.api.requests_total{success:false,env:production} by {agent_id}.as_rate() / sum:agent.api.requests_total{env:production} by {agent_id}.as_rate() * 100 > 5"
thresholds = {
critical = 5
warning = 2
}
tags = ["team:infrastructure", "service:purpleflea-trading-agent"]
}
resource "datadog_monitor" "order_latency_p95" {
name = "Purple Flea Agent - Order Latency P95 High"
type = "metric alert"
query = "percentile(last_5m):p95:agent.order.latency_ms{env:production} by {agent_id} > 1000"
message = <<-EOT
{{#is_alert}}
P95 order latency for {{agent_id.name}} exceeded 1000ms.
Check Purple Flea trading API status.
@slack-trading-alerts
{{/is_alert}}
EOT
thresholds = {
critical = 1000
warning = 500
}
tags = ["team:trading"]
}
resource "datadog_monitor" "escrow_failures" {
name = "Purple Flea Agent - Escrow Failure Spike"
type = "log alert"
message = "@slack-trading-alerts Escrow failures spiking. Check https://escrow.purpleflea.com"
query = "logs(\"service:purpleflea-trading-agent @event_type:escrow_transaction @status:failed\").index(\"*\").rollup(\"count\").by(\"agent_id\").last(\"10m\") > 5"
tags = ["team:trading"]
}
Fleet Dashboard Configuration
Build a Datadog dashboard with template variables for $agent_id, $strategy, and $market. This lets you switch between fleet-wide and individual agent views without cloning dashboards.
Dashboard JSON (key widgets)
{
"title": "Purple Flea Agent Fleet",
"description": "Financial operations dashboard for AI trading agents",
"template_variables": [
{"name": "agent_id", "prefix": "agent_id", "default": "*"},
{"name": "strategy", "prefix": "strategy", "default": "*"},
{"name": "market", "prefix": "market", "default": "*"},
{"name": "env", "prefix": "env", "default": "production"}
],
"widgets": [
{
"definition": {
"type": "query_value",
"title": "Fleet P&L (USD)",
"requests": [
{
"q": "sum:agent.trade.pnl_usd{$agent_id,$strategy,$env}",
"aggregator": "last"
}
],
"custom_unit": "USD",
"precision": 2,
"color_preference": "background"
}
},
{
"definition": {
"type": "timeseries",
"title": "P&L by Market",
"requests": [
{
"q": "sum:agent.trade.pnl_usd{$agent_id,$env} by {market}",
"display_type": "line"
}
]
}
},
{
"definition": {
"type": "query_value",
"title": "Max Drawdown",
"requests": [
{
"q": "max:agent.risk.max_drawdown_pct{$agent_id,$env}",
"aggregator": "max"
}
],
"custom_unit": "%",
"color_preference": "background",
"autoscale": false
}
},
{
"definition": {
"type": "distribution",
"title": "Order Latency Distribution",
"requests": [
{
"q": "agent.order.latency_ms{$agent_id,$market,$env}",
"request_type": "histogram"
}
]
}
},
{
"definition": {
"type": "timeseries",
"title": "Wallet Balance (USDC)",
"requests": [
{
"q": "min:agent.wallet.balance_usd{currency:USDC,$agent_id,$env} by {agent_id}",
"display_type": "line"
}
]
}
},
{
"definition": {
"type": "toplist",
"title": "Error Rate by Endpoint",
"requests": [
{
"q": "top(sum:agent.api.requests_total{success:false,$agent_id,$env} by {endpoint}.as_rate(), 10, 'mean', 'desc')"
}
]
}
}
]
}
agent.api.requests_total with a 99% success rate target over a 30-day rolling window. This gives you error budget burn rate alerts alongside your operational monitors.
Kubernetes: Datadog Agent DaemonSet
Deploy the Datadog Agent as a Kubernetes DaemonSet using the official Helm chart. Enable APM, DogStatsD, and log collection with a single values file.
helm repo add datadog https://helm.datadoghq.com
helm repo update
helm install datadog-agent datadog/datadog \
--namespace datadog \
--create-namespace \
-f datadog-values.yaml
datadog:
apiKey: "your-datadog-api-key"
site: "datadoghq.com"
# APM
apm:
portEnabled: true
port: 8126
# DogStatsD
dogstatsd:
port: 8125
useHostPort: true
nonLocalTraffic: true
# Log collection
logs:
enabled: true
containerCollectAll: true
# Container and process monitoring
processAgent:
enabled: true
processCollection: true
# Kubernetes state metrics
kubeStateMetricsEnabled: true
# Tags applied to all metrics
tags:
- "service:purpleflea-trading-agent"
- "team:trading"
clusterAgent:
enabled: true
replicas: 2
agents:
containers:
agent:
resources:
requests:
cpu: 200m
memory: 256Mi
limits:
cpu: 500m
memory: 512Mi
Agent Pod Annotations
metadata:
annotations:
# Enable log collection for this pod
ad.datadoghq.com/agent.logs: |
[{"source": "python", "service": "purpleflea-trading-agent"}]
# APM injection
admission.datadoghq.com/enabled: "true"
# DogStatsD origin detection
ad.datadoghq.com/agent.check_names: '["purpleflea_agent"]'
ad.datadoghq.com/agent.init_configs: '[{}]'
ad.datadoghq.com/agent.instances: |
[{"min_collection_interval": 15}]
spec:
containers:
- name: agent
env:
# Inject Datadog Agent host via downward API
- name: DD_AGENT_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: DD_DOGSTATSD_HOST
valueFrom:
fieldRef:
fieldPath: status.hostIP
- name: DD_TRACE_AGENT_PORT
value: "8126"
- name: DD_ENV
value: "production"
- name: DD_SERVICE
value: "purpleflea-trading-agent"
- name: DD_VERSION
value: "1.0.0"
- name: DD_LOGS_INJECTION
value: "true"
- name: AGENT_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
CI/CD: Datadog CI Visibility
Track test performance over time and catch performance regressions before they reach production agents. Integrate Datadog CI Visibility into your GitHub Actions pipeline.
name: Purple Flea Agent CI
on:
push:
branches: [main, develop]
pull_request:
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install dependencies
run: pip install -r requirements.txt pytest pytest-asyncio ddtrace
- name: Run tests with Datadog CI Visibility
env:
DD_API_KEY: ${{ secrets.DATADOG_API_KEY }}
DD_SITE: datadoghq.com
DD_ENV: ci
DD_SERVICE: purpleflea-trading-agent
run: |
DD_CIVISIBILITY_AGENTLESS_ENABLED=1 \
ddtrace-run pytest tests/ \
--junit-xml=test-results.xml \
-v
- name: Upload test results
uses: actions/upload-artifact@v4
if: always()
with:
name: test-results
path: test-results.xml
deploy:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Notify Datadog deployment
run: |
curl -X POST "https://api.datadoghq.com/api/v1/deployments" \
-H "DD-API-KEY: ${{ secrets.DATADOG_API_KEY }}" \
-H "Content-Type: application/json" \
-d '{
"service": "purpleflea-trading-agent",
"env": "production",
"version": "${{ github.sha }}",
"repository_url": "${{ github.server_url }}/${{ github.repository }}"
}'
Next Steps
With Datadog fully integrated, extend your observability stack:
- Datadog Watchdog — Enable automatic anomaly detection on P&L and latency metrics. Watchdog surfaces unexpected changes without manual threshold tuning.
- Sensitive Data Scanner — Ensure API keys and wallet addresses are automatically scrubbed from logs before they reach Datadog storage.
- Incident Management — Link monitors to Datadog Incidents for postmortem tracking when drawdown events or API outages occur.
- Purple Flea Escrow — Instrument every agent-to-agent payment at escrow.purpleflea.com with
record_escrow_transaction()and alert on failure spikes. - Purple Flea Faucet — Tie
agent.wallet.balance_usdmonitor resolution to automatic faucet claims at faucet.purpleflea.com. - Prometheus bridge — If you also use Prometheus, forward metrics to Datadog via the Datadog Agent's
openmetricscheck for unified storage. - Notebook reports — Create Datadog Notebooks for weekly P&L and latency reports, shareable with the team without dashboard access.