Connect Sentry's error tracking and performance monitoring to Purple Flea's AI agent infrastructure. Catch silent failures, trace slow order paths, and ship more reliable trading strategies.
Autonomous trading agents run 24/7, make thousands of decisions per day, and handle real money. A single uncaught exception during order placement, a silent API timeout, or a miscalculated position size can compound into significant losses before any human notices. Sentry closes that gap.
Install the Sentry Python SDK alongside your Purple Flea agent. The integration is designed to capture all unhandled exceptions automatically, with trading-specific context attached to every event.
pip install sentry-sdk[httpx] httpx
The httpx extra enables automatic instrumentation of outbound HTTP calls — including all Purple Flea API requests.
import sentry_sdk
from sentry_sdk.integrations.httpx import HttpxIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
import logging
def init_sentry(
dsn: str,
agent_id: str,
strategy_name: str,
environment: str = "production",
release: str = "1.0.0"
):
"""Initialize Sentry with Purple Flea agent context."""
sentry_sdk.init(
dsn=dsn,
environment=environment,
release=release,
traces_sample_rate=1.0, # 100% tracing for agents
profiles_sample_rate=0.1, # 10% profiling
integrations=[
HttpxIntegration(),
LoggingIntegration(
level=logging.INFO,
event_level=logging.ERROR
),
],
# Set agent-wide context on every event
before_send=lambda event, hint: enrich_event(event, hint),
)
# Permanent tags for this agent process
sentry_sdk.set_tag("agent_id", agent_id)
sentry_sdk.set_tag("strategy", strategy_name)
sentry_sdk.set_tag("platform", "purple_flea")
# Identify the agent as a "user" for grouping
sentry_sdk.set_user({
"id": agent_id,
"username": f"agent-{agent_id}"
})
def enrich_event(event: dict, hint: dict) -> dict:
"""Inject Purple Flea trading state into every Sentry event."""
from agent_state import get_current_state # your state singleton
state = get_current_state()
if state:
event.setdefault("extra", {}).update({
"current_asset": state.asset,
"open_positions": state.position_count,
"portfolio_usd": state.portfolio_value_usd,
"last_order_id": state.last_order_id,
"last_signal_z": state.last_signal_z,
})
return event
# Usage — call this at agent startup before anything else
init_sentry(
dsn="https://your-sentry-dsn@sentry.io/0",
agent_id="pf_live_your_agent_id",
strategy_name="vol-arb-btc",
environment="production",
release="agent-vol-arb@2.1.0"
)
import asyncio
import sentry_sdk
async def run_agent():
with sentry_sdk.start_transaction(
op="agent.run",
name="Purple Flea Agent Main Loop"
) as transaction:
transaction.set_tag("agent.phase", "startup")
try:
await initialize_agent()
transaction.set_tag("agent.phase", "running")
await main_loop()
except Exception as e:
sentry_sdk.capture_exception(e)
raise
if __name__ == "__main__":
asyncio.run(run_agent())
By default, Sentry groups errors by stack trace. For trading agents, you want smarter grouping: the same InsufficientMarginError on BTC and ETH should group separately, and a NetworkTimeoutError during order placement is different from one during balance polling.
from sentry_sdk import configure_scope
import sentry_sdk
class TradingError(Exception):
"""Base class for all Purple Flea agent errors."""
def __init__(self, message: str, asset: str = "",
phase: str = "", order_id: str = ""):
super().__init__(message)
self.asset = asset
self.phase = phase
self.order_id = order_id
class InsufficientMarginError(TradingError): pass
class OrderRejectedError(TradingError): pass
class RiskLimitBreachError(TradingError): pass
class SlippageLimitError(TradingError): pass
def capture_trade_error(exc: TradingError, extra: dict = None):
"""Capture trading error with rich context and custom fingerprint."""
with configure_scope() as scope:
# Custom fingerprint: group by error type + asset + phase
scope.fingerprint = [
"{{ default }}",
type(exc).__name__,
exc.asset or "unknown_asset",
exc.phase or "unknown_phase",
]
# Trading-specific tags
scope.set_tag("trade.asset", exc.asset)
scope.set_tag("trade.phase", exc.phase)
scope.set_tag("trade.error_type", type(exc).__name__)
if exc.order_id:
scope.set_tag("trade.order_id", exc.order_id)
if extra:
scope.set_extra("trade_context", extra)
sentry_sdk.capture_exception(exc)
# Example usage in order execution
async def place_order(asset: str, side: str, size: float, api_key: str):
try:
resp = await purple_flea_client.post("/v1/orders/spot", json={
"asset": asset, "direction": side, "quantity": size
})
if resp.status_code == 400:
body = resp.json()
if body.get("code") == "INSUFFICIENT_MARGIN":
raise InsufficientMarginError(
f"Margin too low for {side} {size} {asset}",
asset=asset, phase="order_placement"
)
return resp.json()
except InsufficientMarginError as e:
capture_trade_error(e, extra={"size": size, "side": side})
raise
def before_send(event: dict, hint: dict) -> dict:
"""Filter benign errors, enrich critical ones."""
exc_info = hint.get("exc_info")
if exc_info:
exc_type, exc_value, _ = exc_info
# Drop expected transient errors
if exc_type.__name__ in {"asyncio.TimeoutError", "httpx.RemoteProtocolError"}:
if event.get("tags", {}).get("trade.phase") != "order_placement":
return None # suppress non-critical timeouts
# Escalate any error during order placement
if event.get("tags", {}).get("trade.phase") == "order_placement":
event["level"] = "fatal"
return event
A trading agent's order pipeline has several distinct stages: signal generation, risk check, API serialization, network round-trip, confirmation parsing. Sentry performance tracing makes each stage visible as a span within a transaction.
If your agent's P99 order latency spikes from 50ms to 800ms, you need to know whether it's the risk calculation, the network, or the Purple Flea API's response time — not just that something is slow.
import sentry_sdk
from sentry_sdk import start_span
import time
class InstrumentedOrderPipeline:
"""Order pipeline with Sentry performance tracing at every stage."""
def __init__(self, api_key: str):
self.api_key = api_key # pf_live_... key
async def execute(self, signal: dict) -> dict:
with sentry_sdk.start_transaction(
op="order.pipeline",
name=f"Order Pipeline: {signal['asset']} {signal['side']}"
) as txn:
txn.set_data("signal", signal)
result = {}
# Stage 1: Signal validation
with start_span(op="signal.validate", description="Validate signal") as span:
validated = self._validate_signal(signal)
span.set_data("z_score", signal.get("z_score"))
span.set_data("valid", validated)
if not validated:
txn.set_tag("outcome", "rejected_validation")
return {"status": "rejected"}
# Stage 2: Risk check
with start_span(op="risk.check", description="Portfolio risk check") as span:
t0 = time.perf_counter()
risk_ok = await self._risk_check(signal)
span.set_data("duration_ms", (time.perf_counter() - t0) * 1000)
span.set_data("risk_pass", risk_ok)
if not risk_ok:
txn.set_tag("outcome", "rejected_risk")
return {"status": "rejected"}
# Stage 3: Size calculation
with start_span(op="order.size", description="Compute position size") as span:
size = self._compute_size(signal)
span.set_data("size_usd", size)
# Stage 4: Purple Flea API call
with start_span(op="http.client", description="POST /v1/orders/spot") as span:
t0 = time.perf_counter()
try:
order = await self._submit_order(signal, size)
latency_ms = (time.perf_counter() - t0) * 1000
span.set_data("latency_ms", latency_ms)
span.set_data("order_id", order.get("order_id"))
span.set_data("fill_price", order.get("fill_price"))
txn.set_tag("order.id", order.get("order_id"))
result = order
except Exception as e:
span.set_data("error", str(e))
txn.set_tag("outcome", "api_error")
sentry_sdk.capture_exception(e)
raise
# Stage 5: Confirmation
with start_span(op="order.confirm", description="Verify fill") as span:
filled = await self._confirm_fill(result.get("order_id"))
span.set_data("fill_confirmed", filled)
txn.set_tag("outcome", "filled" if filled else "partial")
return result
def _validate_signal(self, signal: dict) -> bool:
return abs(signal.get("z_score", 0)) >= 1.5
async def _risk_check(self, signal: dict) -> bool:
# Call your risk manager
return True
def _compute_size(self, signal: dict) -> float:
return 1000.0 # simplified
async def _submit_order(self, signal: dict, size: float) -> dict:
# httpx call auto-instrumented by HttpxIntegration
import httpx
async with httpx.AsyncClient() as client:
resp = await client.post(
"https://api.purpleflea.com/v1/orders/spot",
headers={"Authorization": f"Bearer {self.api_key}"},
json={"asset": signal["asset"], "direction": signal["side"], "quantity": size}
)
resp.raise_for_status()
return resp.json()
async def _confirm_fill(self, order_id: str) -> bool:
return True # poll order status in real implementation
Breadcrumbs capture the sequence of events leading up to an error. For a trading agent, this means logging every signal read, risk check outcome, model inference, and API response — so that when something goes wrong, you can replay exactly what the agent was thinking.
import sentry_sdk
from datetime import datetime
def log_signal(asset: str, z_score: float, iv: float, rv: float, action: str):
"""Record a trading signal as a Sentry breadcrumb."""
sentry_sdk.add_breadcrumb(
category="signal",
message=f"Signal: {action} {asset} (z={z_score:.2f}, IV={iv:.1%}, RV={rv:.1%})",
data={"asset": asset, "z_score": z_score, "iv": iv, "rv": rv},
level="info",
type="info"
)
def log_risk_check(check_name: str, passed: bool, value: float, limit: float):
"""Record each risk check outcome."""
sentry_sdk.add_breadcrumb(
category="risk",
message=f"Risk check '{check_name}': {'PASS' if passed else 'FAIL'} ({value:.2f} vs limit {limit:.2f})",
data={"check": check_name, "passed": passed, "value": value, "limit": limit},
level="info" if passed else "warning"
)
def log_api_response(endpoint: str, status: int, latency_ms: float, order_id: str = ""):
"""Record Purple Flea API response."""
sentry_sdk.add_breadcrumb(
category="http",
message=f"Purple Flea API: {endpoint} → {status} ({latency_ms:.0f}ms)",
data={"endpoint": endpoint, "status_code": status,
"latency_ms": latency_ms, "order_id": order_id},
level="info" if status < 400 else "error"
)
def log_position_change(asset: str, side: str, size: float,
price: float, rationale: str):
"""Log a position entry or exit decision."""
sentry_sdk.add_breadcrumb(
category="trade",
message=f"Position: {side} {size} {asset} @ {price:.2f} — {rationale}",
data={"asset": asset, "side": side, "size": size,
"price": price, "rationale": rationale},
level="info"
)
# Example usage in your agent loop
async def agent_decision_cycle(asset: str, pipeline: InstrumentedOrderPipeline):
try:
# Log every step before any errors can occur
signal = await get_signal(asset)
log_signal(asset, signal["z_score"], signal["iv"], signal["rv"], signal["action"])
risk_ok = check_portfolio_risk()
log_risk_check("net_vega", risk_ok["vega_ok"], risk_ok["vega"], 50000)
log_risk_check("daily_loss", risk_ok["loss_ok"], risk_ok["daily_loss"], 3000)
order = await pipeline.execute(signal)
log_api_response("/v1/orders/spot", 201, order["latency_ms"], order["order_id"])
log_position_change(asset, signal["side"], order["size"],
order["fill_price"], f"z={signal['z_score']:.2f}")
except Exception as e:
# Breadcrumbs are automatically attached to the error event
sentry_sdk.capture_exception(e)
raise
Configure Sentry alerts that are tuned for the unique failure modes of autonomous trading agents — not generic web app error rates. These are the alerts that matter most for Purple Flea agents.
OrderRejectedError or InsufficientMarginError. Threshold: 1 occurrence. Immediate PagerDuty/Slack alert.order.pipeline P99 duration exceeds 2 seconds. At this latency, fill quality degrades and slippage materializes.import httpx
SENTRY_ORG = "your-sentry-org"
SENTRY_AUTH_TOKEN = "your-sentry-token"
PROJECT = "purple-flea-agent"
def create_alert_rule(name: str, conditions: list, actions: list,
triggers: list, frequency: int = 5):
"""Programmatically create Sentry alert rule via REST API."""
with httpx.Client() as client:
resp = client.post(
f"https://sentry.io/api/0/projects/{SENTRY_ORG}/{PROJECT}/alert-rules/",
headers={"Authorization": f"Bearer {SENTRY_AUTH_TOKEN}"},
json={
"name": name,
"environment": "production",
"dataset": "transactions",
"query": "",
"timeWindow": frequency,
"thresholdType": 0,
"resolveThreshold": None,
"triggers": triggers,
"actions": actions
}
)
return resp.json()
# Create P99 latency alert
create_alert_rule(
name="Purple Flea Agent: P99 Order Latency > 2s",
conditions=[],
triggers=[{
"label": "critical",
"alertThreshold": 2000,
"resolveThreshold": 1000,
"thresholdType": 0,
"actions": [{
"type": "slack",
"targetType": "specific",
"targetIdentifier": "#trading-alerts"
}]
}],
actions=[],
frequency=5
)
Every time you deploy a new strategy version, parameter update, or model checkpoint, tag it as a Sentry release. This makes it trivial to correlate performance regressions with specific code changes.
# Create a release before deployment
sentry-cli releases new agent-vol-arb@2.1.0 --project purple-flea-agent
# Associate commits
sentry-cli releases set-commits agent-vol-arb@2.1.0 --auto
# Mark deployment live
sentry-cli releases deploys agent-vol-arb@2.1.0 new -e production
# Finalize after health checks pass
sentry-cli releases finalize agent-vol-arb@2.1.0
import os
import subprocess
import sentry_sdk
def get_agent_version() -> str:
"""Derive version from git commit hash at startup."""
try:
commit = subprocess.check_output(
["git", "rev-parse", "--short", "HEAD"],
stderr=subprocess.DEVNULL
).decode().strip()
return f"agent-vol-arb@{commit}"
except Exception:
return os.getenv("AGENT_VERSION", "unknown")
def mark_release_healthy():
"""Called after startup health checks pass."""
import httpx
version = get_agent_version()
httpx.post(
"https://sentry.io/api/0/organizations/your-org/releases/",
headers={"Authorization": "Bearer YOUR_SENTRY_TOKEN"},
json={
"version": version,
"projects": ["purple-flea-agent"],
"dateReleased": datetime.utcnow().isoformat() + "Z"
}
)
# Tag all subsequent events with the active release
sentry_sdk.set_tag("release", version)
Purple Flea agents run indefinitely — they don't have a concept of individual "requests" the way web servers do. Sentry's session monitoring tracks the health of these long-lived processes and alerts when they crash or go silent.
An agent that crashes silently at 3am can incur hours of drift, missed trades, or unhedged positions before anyone notices. Session monitoring closes this gap with a heartbeat pattern.
import sentry_sdk
from sentry_sdk.sessions import auto_session_tracking
import asyncio
from datetime import datetime
class AgentSessionMonitor:
"""
Manages Sentry session lifecycle for a long-running trading agent.
Emits heartbeats so Sentry knows the agent is alive.
"""
def __init__(self, agent_id: str, heartbeat_interval: int = 60):
self.agent_id = agent_id
self.heartbeat_interval = heartbeat_interval
self.start_time = datetime.utcnow()
self._running = True
async def start_session(self):
"""Signal session start to Sentry."""
sentry_sdk.set_tag("session.agent_id", self.agent_id)
sentry_sdk.set_tag("session.start", self.start_time.isoformat())
sentry_sdk.add_breadcrumb(
category="session",
message=f"Agent session started: {self.agent_id}",
level="info"
)
asyncio.create_task(self._heartbeat_loop())
async def _heartbeat_loop(self):
"""Emit periodic breadcrumbs as heartbeats."""
while self._running:
uptime_s = (datetime.utcnow() - self.start_time).total_seconds()
sentry_sdk.add_breadcrumb(
category="heartbeat",
message=f"Agent alive: uptime {uptime_s/3600:.1f}h",
level="debug"
)
sentry_sdk.set_measurement("agent.uptime_hours", uptime_s / 3600)
await asyncio.sleep(self.heartbeat_interval)
def end_session(self, status: str = "ok"):
"""Signal clean session end (ok, abnormal_exit, crashed)."""
self._running = False
sentry_sdk.add_breadcrumb(
category="session",
message=f"Agent session ended: {status}",
level="info" if status == "ok" else "error"
)
async def __aenter__(self):
await self.start_session()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
status = "crashed" if exc_type else "ok"
self.end_session(status)
if exc_type:
sentry_sdk.capture_exception(exc_val)
return False # don't suppress exceptions
# Usage
async def main():
async with AgentSessionMonitor(agent_id="pf_live_agent_abc") as session:
pipeline = InstrumentedOrderPipeline(api_key="pf_live_your_key_here")
while True:
await agent_decision_cycle("BTC", pipeline)
await asyncio.sleep(60)
If your agent has periodic rebalancing jobs — daily parameter recalibration, weekly position rollout — use Sentry Cron Monitors to alert when they don't run on schedule.
import sentry_sdk
async def daily_recalibrate():
"""Daily model recalibration with Sentry cron monitoring."""
monitor_config = {
"schedule": {"type": "crontab", "value": "0 0 * * *"},
"checkin_margin": 10, # 10 min window
"max_runtime": 30, # must complete within 30 min
"failure_issue_threshold": 1,
"recovery_threshold": 2,
}
with sentry_sdk.monitor(monitor_slug="agent-daily-recalibration",
monitor_config=monitor_config):
# Sentry marks the check-in as "in_progress" here
await recalibrate_vol_estimators()
await update_signal_history()
await retrain_iv_model()
# On exit, Sentry marks "ok" or "error"
| Capability | Plain Logging | Sentry + Purple Flea |
|---|---|---|
| Error capture with stack trace | Yes (manual) | Automatic, with context |
| Trading context on every error | No | Asset, order ID, z-score, position size |
| Error grouping by trade type | No | Custom fingerprinting by asset + phase |
| Per-stage latency visibility | No | Full transaction tracing with spans |
| Decision chain replay | Manual log parsing | Structured breadcrumbs attached to error |
| Real-time alerts | Manual log monitoring | Error rate, P99, new regressions, session crash |
| Deployment correlation | No | Release tracking + regression detection |
| Long-running session health | No | Heartbeats + cron monitors |
Every Purple Flea service emits structured errors that map cleanly to Sentry tags. Here's the full coverage matrix for the six-service stack.
| Service | URL | Key Sentry Tags | Critical Error Classes |
|---|---|---|---|
| Casino | purpleflea.com | game, bet_size, outcome | GameError, InsufficientBalanceError |
| Faucet | faucet.purpleflea.com | claim_id, agent_id | AlreadyClaimedError, RegistrationError |
| Escrow | escrow.purpleflea.com | escrow_id, counterparty, amount | EscrowTimeoutError, DisputeError |
| Trading | api.purpleflea.com/v1/orders | asset, side, order_id, fill_price | OrderRejectedError, SlippageLimitError |
| Wallet | api.purpleflea.com/v1/wallet | wallet_type, network, tx_hash | InsufficientFundsError, NetworkError |
| Domains | api.purpleflea.com/v1/domains | domain_name, tld, auction_id | BidRejectedError, AuctionExpiredError |
Register a Purple Flea agent, grab free credits from the faucet, and connect Sentry before you trade a single dollar. Zero blind spots from day one.