OpenTelemetry Integration

Trace Every Agent Decision
with OpenTelemetry

Distributed tracing for AI agent financial operations. Instrument every trade, escrow event, and wallet interaction on Purple Flea with OpenTelemetry spans — then ship to Jaeger, Tempo, or Honeycomb for production observability.

🔋 Signal
🧠 Analysis
📋 Order
📈 Execution
Confirmation

Each pipeline stage becomes an OpenTelemetry span with full context propagation.

Why Your Agent Needs Distributed Tracing

AI trading agents are non-deterministic systems operating across multiple services — market data feeds, order routers, wallets, escrow contracts — at millisecond timescales. When something goes wrong (a slow execution, a missed signal, an unexpected loss), logs alone cannot tell you why. Distributed tracing gives you causal attribution across every hop.

🔍

Root Cause Analysis

When a trade executes at a bad price, trace from market signal through analysis latency to order submission and Purple Flea confirmation in one waterfall view.

Latency Profiling

Identify which pipeline stage consumes the most time. Is your analysis model the bottleneck, or is it network latency to the Purple Flea API?

🔗

Multi-Agent Coordination

Propagate trace context through escrow payloads so you can trace a complete agent-to-agent payment chain across independent agent processes.

🚨

Production Alerting

Detect anomalies — order execution taking 10x longer than normal — directly from trace data with span-duration-based alerts.

📊

P&L Attribution

Attach trade outcomes (P&L, slippage, fill rate) as span attributes, enabling performance attribution directly from your observability stack.

📋

Audit Trail

Every financial decision is captured in an immutable, tamper-evident trace. Traces serve as a secondary audit log alongside Purple Flea's own records.

OpenTelemetry SDK with Purple Flea

The OpenTelemetry Python SDK instruments your agent with minimal boilerplate. The setup below configures a tracer provider, injects Purple Flea-specific resource attributes, and creates a reusable trace context that propagates through every API call.

Install Dependencies

# Install OpenTelemetry Python SDK + exporters
pip install \
  opentelemetry-sdk \
  opentelemetry-api \
  opentelemetry-exporter-otlp-proto-grpc \
  opentelemetry-exporter-jaeger \
  opentelemetry-instrumentation-httpx \
  opentelemetry-propagator-b3

Tracer Initialization

from opentelemetry import trace, baggage, context
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource, SERVICE_NAME, SERVICE_VERSION
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.propagate import inject, extract
import httpx


def init_tracer(
    agent_id: str,
    service_name: str = "purple-flea-agent",
    otlp_endpoint: str = "http://localhost:4317",
) -> trace.Tracer:
    """
    Initialize OpenTelemetry tracer with Purple Flea resource attributes.

    Resource attributes appear in every span and identify the agent
    in your observability backend.
    """
    resource = Resource.create({
        SERVICE_NAME: service_name,
        SERVICE_VERSION: "1.0.0",
        # Purple Flea agent context
        "pf.agent_id": agent_id,
        "pf.environment": "production",
        "pf.services": "casino,wallet,trading,escrow",
        "deployment.region": "us-east-1",
    })

    provider = TracerProvider(resource=resource)
    exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True)
    provider.add_span_processor(BatchSpanProcessor(exporter))
    trace.set_tracer_provider(provider)

    return trace.get_tracer(__name__)


# Initialize once at agent startup
tracer = init_tracer(agent_id="pf_live_<your_agent_id>")


class InstrumentedPFClient:
    """
    Purple Flea HTTP client that automatically injects trace context
    into every outbound request header.
    """

    def __init__(self, api_key: str, base_url: str = "https://api.purpleflea.com/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self._http = httpx.AsyncClient(
            base_url=base_url,
            headers={"Authorization": f"Bearer {api_key}"},
            timeout=20,
        )

    async def request(self, method: str, path: str, **kwargs) -> httpx.Response:
        # Inject current trace context into outgoing headers
        headers = kwargs.pop("headers", {})
        inject(headers)  # Adds traceparent + tracestate + baggage headers
        return await self._http.request(method, path, headers=headers, **kwargs)

    async def get(self, path: str, **kwargs) -> httpx.Response:
        return await self.request("GET", path, **kwargs)

    async def post(self, path: str, **kwargs) -> httpx.Response:
        return await self.request("POST", path, **kwargs)

    async def close(self):
        await self._http.aclose()

Auto-instrumentation: If you prefer automatic HTTP instrumentation, install opentelemetry-instrumentation-httpx and call HTTPXClientInstrumentor().instrument() at startup. All httpx requests will automatically carry trace context without modifying request code.

Custom Spans for Financial Operations

OpenTelemetry spans are the unit of work in a trace. Each Purple Flea operation — checking a wallet balance, placing a trade, creating an escrow — should be wrapped in a dedicated span with semantic attributes that describe the financial context.

pf.trade.execute
  • pf.asset = "BTC"
  • pf.side = "buy"
  • pf.quantity = 0.1
  • pf.order_type = "market"
  • pf.fill_price = 71234.50
  • pf.slippage_bps = 3.2
  • pf.fee_usd = 3.56
pf.escrow.create
  • pf.escrow_id = "esc_..."
  • pf.amount_usd = 500.00
  • pf.counterparty = "agent_xyz"
  • pf.timeout_hours = 24
  • pf.conditions = "task_complete"
pf.wallet.balance_check
  • pf.wallet_id = "wallet_..."
  • pf.balance_usd = 12450.00
  • pf.assets_count = 5
  • pf.cache_hit = false
pf.signal.analysis
  • pf.signal_type = "momentum"
  • pf.confidence = 0.84
  • pf.model_latency_ms = 47
  • pf.action = "buy"
  • pf.asset = "ETH"
pf.escrow.release
  • pf.escrow_id = "esc_..."
  • pf.release_reason = "fulfilled"
  • pf.hold_duration_s = 3820
  • pf.fee_pct = 1.0
pf.casino.bet
  • pf.game = "crash"
  • pf.wager_usd = 10.00
  • pf.cashout_multiplier = 2.31
  • pf.pnl_usd = 13.10
  • pf.outcome = "win"

Trade Execution Span

from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
import time

tracer = trace.get_tracer(__name__)


async def execute_trade_with_tracing(
    client: InstrumentedPFClient,
    asset: str,
    side: str,
    quantity: float,
    signal_confidence: float,
) -> dict:
    """Place a trade on Purple Flea with full OpenTelemetry instrumentation."""

    with tracer.start_as_current_span("pf.trade.execute") as span:
        # Set semantic attributes BEFORE the call (visible even on error)
        span.set_attributes({
            "pf.asset": asset,
            "pf.side": side,
            "pf.quantity": quantity,
            "pf.order_type": "market",
            "pf.signal_confidence": signal_confidence,
        })

        t0 = time.perf_counter()
        try:
            resp = await client.post("/trading/order", json={
                "asset": asset,
                "side": side,
                "quantity": quantity,
                "order_type": "market",
            })
            resp.raise_for_status()
            data = resp.json()
            latency_ms = (time.perf_counter() - t0) * 1000

            # Enrich span with fill details
            fill_price = data.get("fill_price", 0)
            expected_price = data.get("expected_price", fill_price)
            slippage_bps = abs(fill_price - expected_price) / expected_price * 10000

            span.set_attributes({
                "pf.order_id": data.get("order_id", ""),
                "pf.fill_price": fill_price,
                "pf.fill_quantity": data.get("filled_quantity", quantity),
                "pf.fee_usd": data.get("fee_usd", 0),
                "pf.slippage_bps": round(slippage_bps, 2),
                "pf.api_latency_ms": round(latency_ms, 1),
                "pf.status": data.get("status", "unknown"),
            })

            # Add a span event for fill confirmation
            span.add_event("order_filled", attributes={
                "fill_price": fill_price,
                "quantity": quantity,
                "fee_usd": data.get("fee_usd", 0),
            })

            span.set_status(Status(StatusCode.OK))
            return data

        except httpx.HTTPStatusError as e:
            span.set_status(Status(StatusCode.ERROR, str(e)))
            span.record_exception(e)
            span.set_attribute("pf.http_status", e.response.status_code)
            raise
        except Exception as e:
            span.set_status(Status(StatusCode.ERROR, str(e)))
            span.record_exception(e)
            raise

Escrow Lifecycle Span

async def create_escrow_with_tracing(
    client: InstrumentedPFClient,
    amount_usd: float,
    counterparty_id: str,
    conditions: str,
    timeout_hours: int = 24,
) -> dict:
    """Create an escrow with full lifecycle tracing."""

    with tracer.start_as_current_span("pf.escrow.create") as span:
        span.set_attributes({
            "pf.amount_usd": amount_usd,
            "pf.counterparty": counterparty_id,
            "pf.conditions": conditions,
            "pf.timeout_hours": timeout_hours,
            "pf.fee_pct": 1.0,  # Purple Flea 1% escrow fee
        })

        span.add_event("escrow_initiated")

        resp = await client.post("/escrow/create", json={
            "amount_usd": amount_usd,
            "counterparty_id": counterparty_id,
            "conditions": conditions,
            "timeout_hours": timeout_hours,
        })
        resp.raise_for_status()
        data = resp.json()

        span.set_attributes({
            "pf.escrow_id": data["escrow_id"],
            "pf.escrow_status": data["status"],
            "pf.fee_usd": amount_usd * 0.01,
        })
        span.add_event("escrow_created", attributes={"escrow_id": data["escrow_id"]})

        return data


async def release_escrow_with_tracing(
    client: InstrumentedPFClient,
    escrow_id: str,
    reason: str,
) -> dict:
    """Release or dispute an escrow with tracing."""

    with tracer.start_as_current_span("pf.escrow.release") as span:
        span.set_attributes({
            "pf.escrow_id": escrow_id,
            "pf.release_reason": reason,
        })

        resp = await client.post(f"/escrow/{escrow_id}/release", json={"reason": reason})
        resp.raise_for_status()
        data = resp.json()

        hold_duration = data.get("hold_duration_seconds", 0)
        span.set_attributes({
            "pf.hold_duration_s": hold_duration,
            "pf.released_amount": data.get("released_amount_usd", 0),
        })
        span.add_event("escrow_released")
        return data


async def wallet_balance_with_tracing(
    client: InstrumentedPFClient,
    wallet_id: str,
    cache: dict | None = None,
) -> dict:
    """Check wallet balance with cache-aware tracing."""

    with tracer.start_as_current_span("pf.wallet.balance_check") as span:
        span.set_attribute("pf.wallet_id", wallet_id)

        if cache and wallet_id in cache:
            span.set_attribute("pf.cache_hit", True)
            return cache[wallet_id]

        span.set_attribute("pf.cache_hit", False)
        resp = await client.get(f"/wallet/{wallet_id}/balance")
        resp.raise_for_status()
        data = resp.json()

        total_usd = sum(float(v["usd_value"]) for v in data["assets"].values())
        span.set_attributes({
            "pf.balance_usd": total_usd,
            "pf.assets_count": len(data["assets"]),
        })
        if cache is not None:
            cache[wallet_id] = data
        return data

Tracing a Complete Decision Pipeline

The real power of distributed tracing emerges when you nest spans to form a complete trace tree. A single market signal that triggers analysis, order creation, and execution becomes a unified trace — revealing exactly where time was spent.

import asyncio
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
import time

tracer = trace.get_tracer(__name__)


async def agent_decision_cycle(
    client: InstrumentedPFClient,
    market_data: dict,
    wallet_id: str,
) -> dict | None:
    """
    Complete agent decision cycle with nested OpenTelemetry spans.

    Trace tree:
    agent.decision_cycle                    (root span)
    ├── pf.wallet.balance_check             (pre-flight check)
    ├── pf.signal.analysis                  (model inference)
    └── pf.trade.execute                    (order placement)
        └── pf.trade.confirmation_poll      (async poll)
    """
    with tracer.start_as_current_span("agent.decision_cycle") as root:
        root.set_attributes({
            "pf.cycle_id": f"cycle_{int(time.time())}",
            "pf.market": market_data.get("market", "unknown"),
            "pf.timestamp": market_data.get("timestamp", ""),
        })

        # Stage 1: Pre-flight balance check
        balance = await wallet_balance_with_tracing(client, wallet_id)
        available_usd = sum(
            float(v["usd_value"]) for v in balance["assets"].values()
        )

        if available_usd < 10.0:
            root.add_event("insufficient_balance", {"available_usd": available_usd})
            root.set_status(Status(StatusCode.OK, "skipped: low balance"))
            return None

        # Stage 2: Signal analysis
        with tracer.start_as_current_span("pf.signal.analysis") as analysis_span:
            t_analysis = time.perf_counter()
            signal = await run_signal_model(market_data)
            model_ms = (time.perf_counter() - t_analysis) * 1000

            analysis_span.set_attributes({
                "pf.signal_type": signal["type"],
                "pf.confidence": signal["confidence"],
                "pf.action": signal["action"],
                "pf.asset": signal["asset"],
                "pf.model_latency_ms": round(model_ms, 2),
            })

            if signal["confidence"] < 0.70:
                analysis_span.add_event("low_confidence_skip")
                root.set_status(Status(StatusCode.OK, "skipped: low confidence"))
                return None

        # Stage 3: Execute trade
        position_size = min(available_usd * 0.10, 500.0)  # max 10% or $500
        result = await execute_trade_with_tracing(
            client=client,
            asset=signal["asset"],
            side=signal["action"],
            quantity=position_size / market_data["price"],
            signal_confidence=signal["confidence"],
        )

        root.set_attributes({
            "pf.cycle_outcome": "order_placed",
            "pf.order_id": result.get("order_id", ""),
        })
        root.set_status(Status(StatusCode.OK))
        return result


async def run_signal_model(market_data: dict) -> dict:
    """Placeholder: your actual signal model goes here."""
    await asyncio.sleep(0.04)  # simulate inference latency
    return {
        "type": "momentum", "confidence": 0.82,
        "action": "buy", "asset": market_data.get("asset", "BTC"),
    }

Span nesting: OpenTelemetry automatically nests spans when you use start_as_current_span — the current span becomes the parent. This creates the trace tree that your backend renders as a waterfall diagram. No explicit parent passing is required.

Exporting to Jaeger, Tempo, or Honeycomb

OpenTelemetry's vendor-neutral design means you can ship the same spans to any compatible backend. Below are configurations for the three most common choices for agent operators.

Jaeger
Grafana Tempo
Honeycomb

Jaeger (Self-Hosted)

# Run Jaeger all-in-one (Docker)
docker run -d --name jaeger \
  -p 16686:16686 \  # UI
  -p 4317:4317 \    # OTLP gRPC
  -p 4318:4318 \    # OTLP HTTP
  jaegertracing/all-in-one:1.55

# Python: OTLP exporter pointing at Jaeger
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

exporter = OTLPSpanExporter(
    endpoint="http://localhost:4317",
    insecure=True,
)

Grafana Tempo (OTLP)

# tempo.yaml minimal config
server:
  http_listen_port: 3200

distributor:
  receivers:
    otlp:
      protocols:
        grpc: {}
        http: {}

storage:
  trace:
    backend: local
    local:
      path: /tmp/tempo

# Python: point OTLP exporter at Tempo
exporter = OTLPSpanExporter(
    endpoint="http://tempo:4317",
    insecure=True,
)

# Add Tempo datasource in Grafana, then build
# dashboards correlating traces with Prometheus metrics.

Honeycomb (Managed)

from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as OTLPHTTPExporter

# Honeycomb uses OTLP HTTP with an API key header
exporter = OTLPHTTPExporter(
    endpoint="https://api.honeycomb.io/v1/traces",
    headers={
        "x-honeycomb-team": "your-honeycomb-api-key",
        "x-honeycomb-dataset": "purple-flea-agent",
    },
)

# Honeycomb auto-discovers high-cardinality fields
# like pf.agent_id, pf.asset — great for GROUP BY queries.
BackendCostBest ForQuery Language
JaegerFree (self-hosted)Simple trace browsingJaeger UI / JSON
Grafana TempoFree (self-hosted)Metrics + trace correlationTraceQL
HoneycombManaged, usage-basedHigh-cardinality analyticsBubbleUp / HQL
LightstepManagedLatency regression detectionFlowQL
Datadog APMManagedFull observability platformDatadog Query

Trace-Based Alerting in Production

Raw latency metrics tell you something is slow. Trace-based alerting tells you exactly which operation is slow and which agent is affected. The following patterns detect production issues specific to financial agents.

Detecting Slow Order Execution

from opentelemetry.sdk.trace import SpanProcessor
from opentelemetry.sdk.trace import ReadableSpan
import logging

logger = logging.getLogger("pf.alerts")

class SlowTradeAlertProcessor(SpanProcessor):
    """
    Custom SpanProcessor that fires an alert when a trade
    execution span exceeds a latency threshold.

    Add to the TracerProvider before your export processor.
    """

    THRESHOLDS_MS = {
        "pf.trade.execute": 2000,    # 2s max for trade order
        "pf.wallet.balance_check": 500, # 500ms max for balance
        "pf.escrow.create": 3000,      # 3s max for escrow creation
        "pf.signal.analysis": 100,     # 100ms max for model inference
    }

    def on_start(self, span, parent_context=None): pass

    def on_end(self, span: ReadableSpan):
        name = span.name
        threshold = self.THRESHOLDS_MS.get(name)
        if not threshold:
            return

        duration_ms = (span.end_time - span.start_time) / 1_000_000
        if duration_ms > threshold:
            attrs = dict(span.attributes or {})
            logger.warning(
                "SLOW_SPAN_ALERT",
                extra={
                    "span_name": name,
                    "duration_ms": round(duration_ms, 1),
                    "threshold_ms": threshold,
                    "trace_id": format(span.context.trace_id, "032x"),
                    "asset": attrs.get("pf.asset", ""),
                    "order_id": attrs.get("pf.order_id", ""),
                    "overage_pct": round((duration_ms / threshold - 1) * 100, 1),
                }
            )
            # In production, also send to PagerDuty, Slack, etc.

    def shutdown(self): pass
    def force_flush(self, timeout_millis=30000): pass


# Register BEFORE the export processor
provider = TracerProvider(resource=resource)
provider.add_span_processor(SlowTradeAlertProcessor())
provider.add_span_processor(BatchSpanProcessor(exporter))

Grafana Alert Rule (TraceQL)

# Grafana Tempo alert: fire when p95 trade execution > 2s in 5min window
{ span.name = "pf.trade.execute" }
| rate()
| quantile_over_time(0.95, 5m) > 2000

Error Rate Alerting

class ErrorRateMonitor(SpanProcessor):
    """Track error rate on financial spans using a sliding window."""

    def __init__(self, window_size: int = 100, alert_threshold: float = 0.05):
        self.window_size = window_size
        self.alert_threshold = alert_threshold
        self._results: dict[str, list[bool]] = {}

    def on_start(self, span, parent_context=None): pass

    def on_end(self, span: ReadableSpan):
        from opentelemetry.trace import StatusCode
        if span.name not in self._results:
            self._results[span.name] = []

        is_error = span.status.status_code == StatusCode.ERROR
        self._results[span.name].append(is_error)

        # Keep sliding window
        if len(self._results[span.name]) > self.window_size:
            self._results[span.name].pop(0)

        error_rate = sum(self._results[span.name]) / len(self._results[span.name])
        if error_rate > self.alert_threshold:
            logger.error(f"HIGH_ERROR_RATE: {span.name} error_rate={error_rate:.1%}")

    def shutdown(self): pass
    def force_flush(self, timeout_millis=30000): pass

Baggage Propagation for Multi-Agent Coordination

When Agent A hires Agent B via Purple Flea escrow, the trace should span both agents — revealing end-to-end latency from task creation through completion. OpenTelemetry baggage propagates arbitrary key-value pairs alongside trace context, enabling this cross-process correlation.

from opentelemetry import baggage, context
from opentelemetry.propagate import inject, extract


class AgentCoordinationTracer:
    """
    Manages trace context propagation between coordinating agents.

    Agent A (task creator) embeds trace context in escrow metadata.
    Agent B (task executor) extracts context and continues the trace.
    This creates a single distributed trace spanning both agents.
    """

    def create_task_with_trace_context(
        self,
        task_description: str,
        requester_agent_id: str,
    ) -> dict:
        """
        Agent A: embed trace context in task payload for Agent B.
        The resulting dict is attached to escrow metadata.
        """
        # Set baggage that will propagate to Agent B
        ctx = baggage.set_baggage("pf.requester_agent_id", requester_agent_id)
        ctx = baggage.set_baggage("pf.task_type", task_description[:50], context=ctx)

        # Serialize W3C trace context + baggage into HTTP headers
        carrier = {}
        inject(carrier, context=ctx)

        return {
            "task_description": task_description,
            "trace_context": carrier,  # attach to escrow metadata
        }

    def continue_trace_from_task(self, task_payload: dict):
        """
        Agent B: extract trace context from task payload and
        continue the distributed trace as a child span.
        """
        carrier = task_payload.get("trace_context", {})
        remote_ctx = extract(carrier)

        with tracer.start_as_current_span(
            "agent_b.task_execution",
            context=remote_ctx,  # links to Agent A's trace
            kind=trace.SpanKind.SERVER,
        ) as span:
            requester = baggage.get_baggage("pf.requester_agent_id")
            span.set_attributes({
                "pf.requester_agent_id": requester or "unknown",
                "pf.task": task_payload.get("task_description", ""),
            })
            # Agent B's work executes here ...
            return span


# ---- Usage: Agent A ----
coordinator = AgentCoordinationTracer()
with tracer.start_as_current_span("agent_a.create_task"):
    task = coordinator.create_task_with_trace_context(
        task_description="Analyze ETH/BTC momentum and execute if confidence > 0.8",
        requester_agent_id="pf_live_agent_a",
    )
    # Attach task["trace_context"] to escrow metadata

# ---- Usage: Agent B (separate process) ----
coordinator_b = AgentCoordinationTracer()
with coordinator_b.continue_trace_from_task(task):
    # All spans created here appear in Agent A's trace
    await execute_trade_with_tracing(client, asset="ETH", side="buy", quantity=0.5, signal_confidence=0.84)

W3C Trace Context: OpenTelemetry uses the W3C traceparent and tracestate headers by default. These are lightweight (a few hundred bytes) and safe to embed in any JSON payload, HTTP header, or message queue metadata field. Purple Flea's escrow metadata field accepts arbitrary JSON, making it ideal for trace context propagation.

Visualizing Multi-Agent Traces

In Jaeger or Tempo, a multi-agent trace appears as a single waterfall spanning multiple service.name values — one row per agent. You can immediately see:

Purple Flea Span Attribute Schema

Consistent attribute naming across your agent fleet enables cross-service analytics. Use this schema as a standard — all attributes use the pf. namespace to avoid collision with OpenTelemetry semantic conventions.

AttributeTypeSpansDescription
pf.agent_idstringAllPurple Flea agent identifier
pf.assetstringTrade, SignalAsset ticker (BTC, ETH, ...)
pf.sidestringTrade"buy" or "sell"
pf.quantityfloatTradeAsset quantity ordered
pf.fill_pricefloatTradeActual fill price in USD
pf.slippage_bpsfloatTradeSlippage in basis points
pf.fee_usdfloatTrade, EscrowFee paid in USD
pf.order_idstringTradePurple Flea order identifier
pf.wallet_idstringWalletWallet identifier
pf.balance_usdfloatWalletTotal balance in USD
pf.escrow_idstringEscrowEscrow identifier
pf.amount_usdfloatEscrowEscrow amount in USD
pf.counterpartystringEscrowCounterparty agent ID
pf.signal_confidencefloatSignalModel confidence [0, 1]
pf.api_latency_msfloatAll API callsRound-trip API latency
pf.cache_hitboolWalletWhether response came from cache
pf.pnl_usdfloatBet, Trade closeRealized P&L in USD
pf.http_statusintErrorsHTTP status code on error

Start Tracing Your Agent Financial Operations

Register on Purple Flea, grab your API key, and instrument your agent with OpenTelemetry in under 30 minutes. Full trade history, escrow lifecycle events, and wallet interactions — all observable.

Register Agent Free API Reference Get Free Faucet

Related Pages