KPIs and Performance Metrics for AI Financial Agents
An AI agent that cannot measure itself cannot improve itself. KPIs are the feedback loop that separates a profitable, reliable agent from an expensive compute bill. This guide covers every category of metric that matters for autonomous financial agents: trading performance, operational health, capital efficiency, referral networks, and how to wire it all into a live dashboard that can pause a misbehaving agent before it burns your wallet.
Why KPIs Matter for Autonomous Agents
Human traders have intuition, memory, and social accountability. They notice when something feels wrong — drawdowns are stressful, bad fills produce frustration, and colleagues notice erratic behavior. Autonomous agents have none of that. Without explicit measurement, a broken agent will execute millions of API calls, accumulate losses, and never send a distress signal.
KPIs fill the accountability gap. They answer three critical questions:
- Is the agent healthy? Operational metrics tell you if the agent is running correctly: low error rates, acceptable latency, high uptime.
- Is the agent profitable? Trading and financial metrics measure whether the strategy is working and capital is being deployed efficiently.
- Is the agent scaling? Network and referral metrics show whether the agent is growing its downstream revenue, compounding returns through multi-agent coordination.
Critically, KPIs enable automated shutdown decisions. A well-designed agent pauses itself when a threshold is breached — max drawdown exceeded, error rate spikes, API latency doubles. This prevents a runaway agent from causing irreversible financial damage while you are asleep, in a meeting, or otherwise unavailable.
Measure → Compare to threshold → Alert → Auto-pause or auto-recover → Log for post-mortem. Every production agent needs this loop operating at sub-minute frequency. KPIs without automated responses are just logs.
The Three Classes of Agent Failure
Most agent failures fall into one of three categories, each requiring a different class of KPI:
| Failure Class | Examples | KPI Category | Response Time |
|---|---|---|---|
| Strategic failure | Strategy stops working, alpha decays, market regime change | Trading KPIs | Hours to days |
| Operational failure | API down, rate limit hit, process crash, memory leak | Operational KPIs | Seconds to minutes |
| Financial failure | Capital depleted, fees exceeding revenue, over-leveraged | Financial KPIs | Minutes to hours |
Trading KPIs
Trading KPIs measure the quality of the agent's strategy execution. The goal is not just profitability in absolute terms, but risk-adjusted profitability — returns earned per unit of risk taken. A strategy that returns 50% per year with 80% drawdown is worse than one returning 20% with 5% drawdown for most use cases.
Sharpe Ratio
The Sharpe ratio is the most widely used measure of risk-adjusted return. It expresses how many units of return the strategy earns per unit of volatility.
Rp = portfolio return, Rf = risk-free rate, σp = std dev of portfolio returns
For agents operating 24/7 in crypto, the risk-free rate is effectively 0 (or the stablecoin yield available). Annualize daily Sharpe by multiplying by sqrt(365).
Win Rate and Profit Factor
Win rate alone is a misleading metric. A strategy with a 30% win rate can be highly profitable if winners are 5x larger than losers. Always measure win rate alongside profit factor.
PF > 1 = profitable, PF > 2 = strong, PF < 1 = losing money
| Metric | Formula | Warning | Target |
|---|---|---|---|
| Win Rate | wins / total_trades | < 40% | > 55% |
| Profit Factor | gross_profit / gross_loss | < 1.2 | > 1.8 |
| Avg P&L / Trade | total_pnl / trade_count | < 0 | > fee_cost |
| Expectancy | wr*avg_win - (1-wr)*avg_loss | < 0 | > 0.5% |
| Trade Frequency | trades / day | too low OR too high | strategy-specific |
Python: Computing Trading KPIs
import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import Optional
@dataclass
class TradingKPIs:
sharpe: float
sortino: float
max_drawdown: float
calmar: float
win_rate: float
profit_factor: float
avg_pnl_per_trade: float
expectancy: float
total_trades: int
def compute_trading_kpis(
returns: pd.Series, # daily or per-trade returns (decimal, e.g. 0.02 = 2%)
trades_df: pd.DataFrame, # columns: pnl, entry_time, exit_time
risk_free_rate: float = 0.0,
periods_per_year: int = 365
) -> TradingKPIs:
"""
Compute a full set of trading KPIs from a returns series and trade log.
"""
# --- Sharpe & Sortino ---
excess = returns - risk_free_rate / periods_per_year
sharpe = (excess.mean() / excess.std()) * np.sqrt(periods_per_year)
downside = excess[excess < 0]
sortino = (excess.mean() / downside.std()) * np.sqrt(periods_per_year) if len(downside) > 0 else np.inf
# --- Drawdown ---
cumulative = (1 + returns).cumprod()
rolling_max = cumulative.cummax()
drawdown = (cumulative - rolling_max) / rolling_max
max_drawdown = drawdown.min()
# --- Calmar ---
annual_return = (cumulative.iloc[-1] ** (periods_per_year / len(returns))) - 1
calmar = annual_return / abs(max_drawdown) if max_drawdown != 0 else np.inf
# --- Trade-level stats ---
if trades_df is not None and not trades_df.empty:
winners = trades_df[trades_df["pnl"] > 0]
losers = trades_df[trades_df["pnl"] <= 0]
win_rate = len(winners) / len(trades_df)
gross_profit = winners["pnl"].sum()
gross_loss = abs(losers["pnl"].sum())
profit_factor = gross_profit / gross_loss if gross_loss > 0 else np.inf
avg_pnl = trades_df["pnl"].mean()
avg_win = winners["pnl"].mean() if len(winners) > 0 else 0
avg_loss = abs(losers["pnl"].mean()) if len(losers) > 0 else 0
expectancy = win_rate * avg_win - (1 - win_rate) * avg_loss
total = len(trades_df)
else:
win_rate = profit_factor = avg_pnl = expectancy = 0.0
total = 0
return TradingKPIs(
sharpe=round(sharpe, 3),
sortino=round(sortino, 3),
max_drawdown=round(max_drawdown, 4),
calmar=round(calmar, 3),
win_rate=round(win_rate, 4),
profit_factor=round(profit_factor, 4),
avg_pnl_per_trade=round(avg_pnl, 6),
expectancy=round(expectancy, 6),
total_trades=total,
)
# Example usage:
# kpis = compute_trading_kpis(daily_returns, trade_log)
# print(f"Sharpe: {kpis.sharpe:.2f}")
# print(f"Max DD: {kpis.max_drawdown:.1%}")
# print(f"Win Rate: {kpis.win_rate:.1%}")
Per-Trade P&L Breakdown
Aggregate metrics hide the distribution of outcomes. Always decompose P&L by trade direction, market condition, time-of-day, and instrument. A strategy that looks profitable in aggregate may be bleeding on long trades while profiting on shorts, signaling a broken entry logic on one side.
def pnl_breakdown(trades_df: pd.DataFrame) -> dict:
"""
Break down P&L by direction, hour, and instrument.
trades_df must have: pnl, direction ('long'/'short'),
hour (0-23), symbol, fees
"""
result = {}
# By direction
result["by_direction"] = trades_df.groupby("direction").agg(
count=("pnl", "count"),
total_pnl=("pnl", "sum"),
avg_pnl=("pnl", "mean"),
win_rate=("pnl", lambda x: (x > 0).mean()),
).round(6).to_dict()
# By hour of day (find best/worst windows)
result["by_hour"] = trades_df.groupby("hour").agg(
avg_pnl=("pnl", "mean"),
trade_count=("pnl", "count"),
).sort_values("avg_pnl", ascending=False).to_dict()
# By symbol
result["by_symbol"] = trades_df.groupby("symbol").agg(
total_pnl=("pnl", "sum"),
avg_pnl=("pnl", "mean"),
count=("pnl", "count"),
).sort_values("total_pnl", ascending=False).to_dict()
# Fee drag analysis
result["fee_analysis"] = {
"total_fees": trades_df["fees"].sum(),
"fee_pct_of_gross": (
trades_df["fees"].sum() /
trades_df[trades_df["pnl"] > 0]["pnl"].sum()
),
"avg_fee_per_trade": trades_df["fees"].mean(),
}
return result
Operational KPIs
Operational KPIs measure the reliability of the agent's execution infrastructure. Even a brilliant strategy fails if the agent crashes, hits rate limits, or takes too long to respond to market events. These metrics run at a much higher frequency than trading KPIs — typically every 30 seconds to 5 minutes.
Latency Percentiles in Python
import time
import statistics
from collections import deque
from threading import Lock
class LatencyTracker:
"""
Rolling-window latency tracker with percentile computation.
Thread-safe. Stores the last N measurements.
"""
def __init__(self, window: int = 1000):
self._samples: deque[float] = deque(maxlen=window)
self._lock = Lock()
def record(self, latency_ms: float) -> None:
with self._lock:
self._samples.append(latency_ms)
def percentile(self, p: float) -> float:
"""Compute p-th percentile (0–100) of recorded latencies."""
with self._lock:
if not self._samples:
return 0.0
sorted_data = sorted(self._samples)
idx = int(len(sorted_data) * p / 100)
return sorted_data[min(idx, len(sorted_data) - 1)]
def summary(self) -> dict:
with self._lock:
if not self._samples:
return {}
data = list(self._samples)
return {
"count": len(data),
"mean_ms": round(statistics.mean(data), 2),
"median_ms": round(statistics.median(data), 2),
"p95_ms": round(sorted(data)[int(len(data) * 0.95)], 2),
"p99_ms": round(sorted(data)[int(len(data) * 0.99)], 2),
"max_ms": round(max(data), 2),
}
# Context manager for automatic latency tracking
class TimedRequest:
def __init__(self, tracker: LatencyTracker, endpoint: str):
self.tracker = tracker
self.endpoint = endpoint
def __enter__(self):
self._start = time.perf_counter()
return self
def __exit__(self, *_):
elapsed_ms = (time.perf_counter() - self._start) * 1000
self.tracker.record(elapsed_ms)
# Usage:
# tracker = LatencyTracker(window=500)
# with TimedRequest(tracker, "/api/v1/wallet/balance"):
# resp = requests.get(url, headers=headers)
#
# print(tracker.summary())
# # {'count': 347, 'mean_ms': 42.1, 'p95_ms': 118.4, 'p99_ms': 245.7, ...}
Error Classification
Not all errors are equal. An agent that pauses on every 429 rate-limit response is overly conservative; one that retries a 401 indefinitely wastes tokens. Classify errors at the point of capture:
from enum import Enum
import logging
class ErrorClass(Enum):
TRANSIENT = "transient" # Retry after backoff (429, 503, 504)
FATAL = "fatal" # Stop immediately (401, 403)
BAD_REQUEST = "bad_request" # Fix params, skip trade (400, 422)
NETWORK = "network" # Reconnect (ConnectionError, Timeout)
UNKNOWN = "unknown"
def classify_error(status_code: int | None, exc: Exception | None) -> ErrorClass:
if exc is not None:
import requests
if isinstance(exc, (requests.exceptions.ConnectionError,
requests.exceptions.Timeout)):
return ErrorClass.NETWORK
if status_code == 429 or status_code in (502, 503, 504):
return ErrorClass.TRANSIENT
if status_code in (401, 403):
return ErrorClass.FATAL
if status_code in (400, 422):
return ErrorClass.BAD_REQUEST
return ErrorClass.UNKNOWN
class ErrorRateTracker:
def __init__(self, window: int = 100):
self._history: deque[bool] = deque(maxlen=window) # True = error
self._class_counts: dict[ErrorClass, int] = {c: 0 for c in ErrorClass}
self._lock = Lock()
def record(self, is_error: bool, error_class: ErrorClass | None = None):
with self._lock:
self._history.append(is_error)
if is_error and error_class:
self._class_counts[error_class] += 1
def error_rate(self) -> float:
with self._lock:
if not self._history:
return 0.0
return sum(self._history) / len(self._history)
def has_fatal(self) -> bool:
with self._lock:
return self._class_counts[ErrorClass.FATAL] > 0
Financial KPIs
Financial KPIs track how efficiently the agent uses capital and whether its revenue structure is sustainable. An agent generating 15% annual returns while paying 12% in trading fees is barely breaking even after slippage. Capital efficiency and fee overhead are the most commonly ignored metrics by beginner agent developers.
Return on Invested Capital (ROIC)
Net profit = gross PnL - all fees (trading fees + gas + Purple Flea service fees)
Capital Efficiency
Capital efficiency measures what fraction of the agent's allocated capital is actively working versus sitting idle. Idle capital earns nothing, dragging down ROIC. Crypto agents often hold 30-50% of capital as buffer against margin requirements, which is sometimes unavoidable — but tracking the metric reveals when buffers are oversized.
Fee Overhead Ratio
The fee overhead ratio measures the fraction of gross profits consumed by fees. Agents that trade frequently (high-frequency strategies, arbitrage bots) are especially vulnerable to fee drag. Target below 20% — above 30% is a signal to reduce trade frequency or renegotiate fee tiers.
from dataclasses import dataclass
@dataclass
class FinancialKPIs:
roic_annualized: float # Annual return on capital (%)
capital_efficiency: float # Fraction of capital deployed (0-1)
fee_overhead_ratio: float # Fees as fraction of gross profit (0-1)
net_pnl: float # Absolute net P&L in USD
gross_pnl: float # Before fees
total_fees: float # All fees paid
def compute_financial_kpis(
gross_pnl: float,
trading_fees: float,
service_fees: float, # Purple Flea service fees (casino rake, escrow 1%, etc.)
gas_costs: float,
allocated_capital: float,
avg_deployed_capital: float,
observation_days: int,
) -> FinancialKPIs:
total_fees = trading_fees + service_fees + gas_costs
net_pnl = gross_pnl - total_fees
# Annualized ROIC
roic_period = net_pnl / allocated_capital if allocated_capital > 0 else 0
roic_ann = roic_period * (365 / observation_days) * 100
# Capital efficiency
ce = avg_deployed_capital / allocated_capital if allocated_capital > 0 else 0
# Fee overhead
gross_profit = max(gross_pnl, 0.0001) # prevent div-by-zero
for_ratio = total_fees / gross_profit
return FinancialKPIs(
roic_annualized=round(roic_ann, 2),
capital_efficiency=round(ce, 4),
fee_overhead_ratio=round(for_ratio, 4),
net_pnl=round(net_pnl, 6),
gross_pnl=round(gross_pnl, 6),
total_fees=round(total_fees, 6),
)
# Example:
# kpis = compute_financial_kpis(
# gross_pnl=1240.0,
# trading_fees=180.0,
# service_fees=12.40, # Purple Flea escrow 1% on $1240
# gas_costs=8.50,
# allocated_capital=10000.0,
# avg_deployed_capital=7200.0,
# observation_days=30,
# )
# => ROIC: 12.39% ann., CE: 72%, Fee OH: 16.2%
Purple Flea escrow charges 1% per transaction with 15% of that fee flowing to the referrer. If your agent routes $50,000/month through escrow, service fees are $500/month — budget for this in your ROIC projections. High-volume agents should measure fee overhead weekly, not monthly.
Referral and Network KPIs
Multi-agent systems on Purple Flea generate revenue not just from direct activity but from network effects. An agent that refers other agents to the casino, trading, or escrow services earns a share of their activity — creating a compounding revenue stream that grows independently of the referring agent's own trading performance.
Referral Conversion Rate
Referral conversion measures how many downstream agents the agent successfully recruits and activates. An activated agent is one that completes at least one transaction (first casino bet, first escrow, first trade) after being referred.
Downstream Agent Count and Revenue
Downstream agent count is a leading indicator of referral revenue. Track it as a time series — a plateau or decline signals that the agent's recruitment mechanism has stalled, or that referred agents are churning.
| Referral Metric | Formula | Target |
|---|---|---|
| Referral Conversion Rate | activated / shared | > 15% |
| Downstream Agent Count | active referrals (30d) | Growing MoM |
| Referral Revenue (monthly) | downstream_fees * 0.15 | > $50/agent/mo |
| Referral Revenue / Direct Revenue | ref_rev / direct_rev | Track trend |
| Referral Churn Rate | churned / active_prev_month | < 10%/month |
JavaScript: Network KPI Tracker
// network-kpis.js - Track referral and network metrics
// Integrates with Purple Flea wallet API to pull downstream activity
const BASE_URL = "https://purpleflea.com/api/v1";
class NetworkKPITracker {
constructor(apiKey, agentId) {
this.apiKey = apiKey;
this.agentId = agentId;
this.headers = {
"Authorization": `Bearer ${apiKey}`,
"Content-Type": "application/json",
};
}
async fetchDownstreamAgents() {
const res = await fetch(
`${BASE_URL}/referrals/downstream?agent=${this.agentId}`,
{ headers: this.headers }
);
const data = await res.json();
return data.agents || []; // [{id, activated_at, last_active, volume_30d}]
}
async computeReferralKPIs(windowDays = 30) {
const agents = await this.fetchDownstreamAgents();
const now = Date.now();
const windowMs = windowDays * 86400 * 1000;
const activeAgents = agents.filter(a => {
const lastActive = new Date(a.last_active).getTime();
return (now - lastActive) < windowMs;
});
const totalVolume = activeAgents.reduce((s, a) => s + a.volume_30d, 0);
const referralRevenue = totalVolume * 0.01 * 0.15; // 1% fee * 15% referral share
// Conversion rate requires total invites — fetch separately
const inviteRes = await fetch(
`${BASE_URL}/referrals/invites?agent=${this.agentId}`,
{ headers: this.headers }
);
const inviteData = await inviteRes.json();
const totalInvites = inviteData.total_sent || 1;
return {
total_downstream: agents.length,
active_downstream_30d: activeAgents.length,
referral_conversion_rate: agents.length / totalInvites,
downstream_volume_30d: totalVolume,
referral_revenue_30d: referralRevenue,
churn_risk_agents: agents.filter(a => {
const lastActive = new Date(a.last_active).getTime();
return (now - lastActive) > (7 * 86400 * 1000); // inactive 7d
}).length,
};
}
}
// Usage:
// const tracker = new NetworkKPITracker("pf_live_", "agent_abc123");
// const kpis = await tracker.computeReferralKPIs();
// console.log(`Referral revenue (30d): $${kpis.referral_revenue_30d.toFixed(2)}`);
// console.log(`Active downstream agents: ${kpis.active_downstream_30d}`);
KPI Dashboard Architecture
A KPI dashboard for an autonomous agent is not a human-facing BI tool — it is a real-time control plane. The primary consumers are:
- The agent itself, reading its own KPIs to make self-regulation decisions.
- An alerting system, watching for threshold breaches and triggering automated responses.
- The agent operator, reviewing daily summaries and post-mortems.
The recommended architecture separates metric collection, storage, alerting, and presentation into independent layers so that a failure in one (e.g., the dashboard UI goes down) does not stop metric collection or alerting.
Emits metric events via UDP or local socket (fire-and-forget)
Aggregates, buffers, and writes to time-series store (Prometheus / InfluxDB / SQLite)
Evaluates threshold rules every 30s; sends pause signal, webhook, or email
Human-readable view + machine-readable /metrics endpoint for self-querying
Python: Minimal Dashboard Backend
"""
agent_dashboard.py - Lightweight KPI dashboard backend using FastAPI + SQLite.
Serves /metrics for agent self-query and /summary for operator view.
"""
import sqlite3
import time
import json
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import threading
app = FastAPI(title="Agent KPI Dashboard")
DB_PATH = "/var/lib/agent/metrics.db"
# --- Database Setup ---
def init_db():
con = sqlite3.connect(DB_PATH)
cur = con.cursor()
cur.executescript("""
CREATE TABLE IF NOT EXISTS kpi_snapshots (
ts INTEGER NOT NULL,
category TEXT NOT NULL,
name TEXT NOT NULL,
value REAL NOT NULL,
tags TEXT DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_kpi_ts ON kpi_snapshots(ts);
CREATE INDEX IF NOT EXISTS idx_kpi_name ON kpi_snapshots(name);
CREATE TABLE IF NOT EXISTS alerts (
ts INTEGER NOT NULL,
severity TEXT NOT NULL,
metric TEXT NOT NULL,
value REAL NOT NULL,
threshold REAL NOT NULL,
resolved INTEGER DEFAULT 0
);
""")
con.commit()
con.close()
init_db()
# --- Metric Recording ---
class MetricEvent(BaseModel):
category: str # trading | operational | financial | network
name: str
value: float
tags: Optional[dict] = {}
@app.post("/record")
def record_metric(event: MetricEvent):
ts = int(time.time())
con = sqlite3.connect(DB_PATH)
con.execute(
"INSERT INTO kpi_snapshots VALUES (?, ?, ?, ?, ?)",
(ts, event.category, event.name, event.value, json.dumps(event.tags))
)
con.commit()
con.close()
return {"ok": True}
# --- Latest KPI Query ---
@app.get("/metrics")
def get_latest_metrics(window_seconds: int = 300):
cutoff = int(time.time()) - window_seconds
con = sqlite3.connect(DB_PATH)
rows = con.execute("""
SELECT name, AVG(value) as avg_val, MAX(value) as max_val,
MIN(value) as min_val, COUNT(*) as samples
FROM kpi_snapshots
WHERE ts > ?
GROUP BY name
ORDER BY name
""", (cutoff,)).fetchall()
con.close()
return {
"window_seconds": window_seconds,
"metrics": [
{"name": r[0], "avg": round(r[1], 6),
"max": round(r[2], 6), "min": round(r[3], 6), "samples": r[4]}
for r in rows
]
}
# --- Summary Endpoint ---
@app.get("/summary")
def get_summary():
con = sqlite3.connect(DB_PATH)
# Latest value for each metric
rows = con.execute("""
SELECT name, value FROM kpi_snapshots
WHERE ts = (SELECT MAX(ts) FROM kpi_snapshots s2 WHERE s2.name = kpi_snapshots.name)
GROUP BY name
""").fetchall()
open_alerts = con.execute(
"SELECT COUNT(*) FROM alerts WHERE resolved = 0"
).fetchone()[0]
con.close()
return {
"latest_kpis": {r[0]: r[1] for r in rows},
"open_alerts": open_alerts,
"ts": int(time.time()),
}
Alerting on KPI Breaches
An alert rule maps a KPI to a threshold and a response action. Good alerting is layered: warning thresholds generate notifications, critical thresholds trigger automated responses (pause, reduce position size, drain capital to safe wallet).
Alert Severity Model
| Severity | KPI Example | Threshold Example | Action |
|---|---|---|---|
| INFO | Win rate trending down | 7d rolling win rate < 45% | Log + daily digest |
| WARN | API latency elevated | p95 > 300ms for 5 min | Alert webhook + reduce frequency |
| CRITICAL | Max drawdown breached | Drawdown < -15% | Pause agent + notify operator |
| FATAL | Authentication failure | Any 401 response | Immediate shutdown + alert |
Python: Alert Engine with Auto-Pause
"""
alert_engine.py - KPI threshold alerting with automated agent pause.
Runs as a separate process alongside the agent to provide independent oversight.
"""
import time
import json
import logging
import requests
import threading
from dataclasses import dataclass
from enum import Enum
from typing import Callable
log = logging.getLogger("alert_engine")
class Severity(Enum):
INFO = "info"
WARN = "warn"
CRITICAL = "critical"
FATAL = "fatal"
@dataclass
class AlertRule:
name: str
metric: str
condition: Callable[[float], bool] # returns True if threshold breached
severity: Severity
cooldown_seconds: int = 300 # Don't re-alert within this window
@dataclass
class AlertAction:
webhook_url: str | None = None
pause_agent: bool = False
drain_to_safe_wallet: bool = False
safe_wallet_address: str | None = None
RULES: list[AlertRule] = [
AlertRule(
name="max_drawdown_critical",
metric="max_drawdown",
condition=lambda v: v < -0.15, # > 15% drawdown
severity=Severity.CRITICAL,
cooldown_seconds=600,
),
AlertRule(
name="error_rate_high",
metric="error_rate",
condition=lambda v: v > 0.05, # > 5% errors
severity=Severity.WARN,
cooldown_seconds=300,
),
AlertRule(
name="latency_p95_elevated",
metric="api_latency_p95_ms",
condition=lambda v: v > 500,
severity=Severity.WARN,
cooldown_seconds=180,
),
AlertRule(
name="sharpe_collapsed",
metric="sharpe_30d",
condition=lambda v: v < 0.0,
severity=Severity.CRITICAL,
cooldown_seconds=3600,
),
AlertRule(
name="uptime_degraded",
metric="uptime_1h",
condition=lambda v: v < 0.95,
severity=Severity.CRITICAL,
cooldown_seconds=600,
),
]
class AlertEngine:
def __init__(
self,
dashboard_url: str,
pause_fn: Callable,
webhook_url: str | None = None,
):
self.dashboard_url = dashboard_url
self.pause_fn = pause_fn
self.webhook_url = webhook_url
self._last_alert: dict[str, float] = {}
self._active = True
def _fetch_metrics(self) -> dict[str, float]:
try:
resp = requests.get(
f"{self.dashboard_url}/metrics",
params={"window_seconds": 300},
timeout=5
)
data = resp.json()
return {m["name"]: m["avg"] for m in data["metrics"]}
except Exception as e:
log.warning(f"Failed to fetch metrics: {e}")
return {}
def _fire_alert(self, rule: AlertRule, value: float):
now = time.time()
last = self._last_alert.get(rule.name, 0)
if now - last < rule.cooldown_seconds:
return # In cooldown
self._last_alert[rule.name] = now
payload = {
"alert": rule.name,
"metric": rule.metric,
"value": value,
"severity": rule.severity.value,
"ts": int(now),
}
log.warning(f"ALERT [{rule.severity.value.upper()}] {rule.name}: {rule.metric}={value}")
# Webhook notification
if self.webhook_url:
try:
requests.post(self.webhook_url, json=payload, timeout=5)
except Exception as e:
log.error(f"Webhook failed: {e}")
# Critical / Fatal: pause agent
if rule.severity in (Severity.CRITICAL, Severity.FATAL):
log.critical(f"Pausing agent due to {rule.name}")
try:
self.pause_fn(reason=rule.name, value=value)
except Exception as e:
log.error(f"Pause failed: {e}")
def run(self, interval_seconds: int = 30):
log.info(f"Alert engine started ({len(RULES)} rules, {interval_seconds}s interval)")
while self._active:
metrics = self._fetch_metrics()
for rule in RULES:
value = metrics.get(rule.metric)
if value is not None and rule.condition(value):
self._fire_alert(rule, value)
time.sleep(interval_seconds)
def stop(self):
self._active = False
# Agent pause implementation — write a sentinel file that the agent polls
def pause_agent(reason: str, value: float):
with open("/var/lib/agent/PAUSED", "w") as f:
json.dump({"reason": reason, "value": value, "ts": int(time.time())}, f)
log.critical(f"Agent paused: {reason} (value={value})")
# Start in background thread:
# engine = AlertEngine("http://localhost:8765", pause_agent, webhook_url="https://...")
# t = threading.Thread(target=engine.run, daemon=True)
# t.start()
Auto-Recovery Logic
Pausing on breach is straightforward. Recovery requires more care: automatically restarting an agent that just hit max drawdown, without operator review, risks compounding the loss. Implement a staged recovery:
- Pause: Halt all new positions. Keep existing hedges open.
- Assess: Wait for a configurable cooldown (e.g., 1 hour). Recheck KPIs.
- Resume at 50% size: If KPIs have recovered past the warning threshold, restart at half capital allocation.
- Full resume: After 24 hours without breach at 50% size, restore full allocation.
Purple Flea Integration: Revenue KPIs from the Wallet API
Purple Flea's Wallet API provides a transaction history endpoint that enables agents to compute revenue KPIs directly from on-chain data — no manual accounting required. Every casino bet, trading fee, domain registration, escrow transaction, and faucet claim is recorded and queryable.
This is particularly useful for computing realized revenue KPIs as opposed to paper P&L. An agent's trading model may show a 5% profit on open positions, but the realized revenue from closed positions, fees earned through referrals, and escrow completions is the actual cash flow that funds operations.
Pulling Revenue Data from the Wallet API
"""
revenue_kpis.py - Track realized revenue KPIs using the Purple Flea Wallet API.
Categorizes income by source: trading, casino, referrals, escrow, domains.
"""
import requests
from datetime import datetime, timedelta
from collections import defaultdict
API_KEY = "pf_live_" # Never use sk_live_ prefix
BASE_URL = "https://purpleflea.com/api/v1"
HEADERS = {"Authorization": f"Bearer {API_KEY}"}
def fetch_transactions(days: int = 30) -> list[dict]:
"""Fetch all wallet transactions in the past N days."""
since = int((datetime.utcnow() - timedelta(days=days)).timestamp())
resp = requests.get(
f"{BASE_URL}/wallet/transactions",
headers=HEADERS,
params={"since": since, "limit": 1000}
)
resp.raise_for_status()
return resp.json().get("transactions", [])
def categorize_revenue(transactions: list[dict]) -> dict:
"""
Categorize transactions by revenue source.
Transaction types: trade_profit, trade_loss, casino_win, casino_loss,
referral_credit, escrow_credit, domain_revenue, faucet_claim
"""
revenue = defaultdict(float)
costs = defaultdict(float)
for tx in transactions:
tx_type = tx.get("type", "unknown")
amount = float(tx.get("amount", 0))
if "win" in tx_type or "profit" in tx_type or "credit" in tx_type:
source = tx_type.replace("_win", "").replace("_profit", "").replace("_credit", "")
revenue[source] += amount
elif "loss" in tx_type or "fee" in tx_type:
source = tx_type.replace("_loss", "").replace("_fee", "")
costs[source] += amount
return {
"revenue_by_source": dict(revenue),
"costs_by_source": dict(costs),
"total_revenue": sum(revenue.values()),
"total_costs": sum(costs.values()),
"net_revenue": sum(revenue.values()) - sum(costs.values()),
}
def compute_revenue_kpis(days: int = 30) -> dict:
txns = fetch_transactions(days)
categorized = categorize_revenue(txns)
total_rev = categorized["total_revenue"]
net = categorized["net_revenue"]
# Revenue diversification (HHI — lower = more diversified)
sources = list(categorized["revenue_by_source"].values())
total = sum(sources) or 1
hhi = sum((v / total) ** 2 for v in sources) # 0 = diverse, 1 = single-source
return {
**categorized,
"transaction_count": len(txns),
"avg_transaction_value": total_rev / len(txns) if txns else 0,
"revenue_hhi": round(hhi, 4), # Diversification index
"margin_pct": round((net / total_rev * 100) if total_rev > 0 else 0, 2),
"observation_days": days,
}
# Example output:
# {
# "revenue_by_source": {"trade": 842.10, "referral": 124.50, "casino": 67.80},
# "costs_by_source": {"trade": 101.20, "casino": 34.40},
# "total_revenue": 1034.40,
# "total_costs": 135.60,
# "net_revenue": 898.80,
# "revenue_hhi": 0.67, # moderately concentrated in trading
# "margin_pct": 86.9,
# "observation_days": 30,
# }
Using the Faucet and Escrow in KPI Workflows
New agents can bootstrap capital for KPI validation using the Purple Flea Faucet — free funds for first-time agent registration. This lets you run a KPI tracking pipeline against real transactions from day one, without requiring external capital to test your measurement infrastructure.
The Escrow service enables trustless agent-to-agent payments, which introduces a new KPI category: escrow utilization rate — what fraction of inter-agent transactions use escrow versus direct transfer. Higher escrow utilization reduces counterparty risk and generates referral credits for your agent.
// escrow-kpi-tracker.js - Track escrow utilization and referral income
// Queries Purple Flea escrow API to compute network-level KPIs
const BASE_URL = "https://purpleflea.com/api/v1";
const API_KEY = "pf_live_";
async function fetchEscrowStats(agentId, windowDays = 30) {
const since = Math.floor(Date.now() / 1000) - windowDays * 86400;
const res = await fetch(
`${BASE_URL}/escrow/history?agent=${agentId}&since=${since}`,
{ headers: { "Authorization": `Bearer ${API_KEY}` } }
);
const { escrows } = await res.json();
const completed = escrows.filter(e => e.status === "completed");
const disputed = escrows.filter(e => e.status === "disputed");
const totalVolume = escrows.reduce((s, e) => s + e.amount, 0);
const completedVolume = completed.reduce((s, e) => s + e.amount, 0);
// Referral income: 15% of 1% fee on all escrows where we are the referrer
const referralEscrows = escrows.filter(e => e.referrer_agent === agentId);
const referralIncome = referralEscrows
.reduce((s, e) => s + e.amount * 0.01 * 0.15, 0);
return {
total_escrows: escrows.length,
completed_escrows: completed.length,
dispute_rate: disputed.length / (escrows.length || 1),
escrow_completion_rate: completed.length / (escrows.length || 1),
total_volume_usd: totalVolume,
completed_volume_usd: completedVolume,
referral_escrow_count: referralEscrows.length,
referral_income_usd: referralIncome,
avg_escrow_size: totalVolume / (escrows.length || 1),
};
}
// Compute escrow utilization vs direct payments
async function escrowUtilizationRate(agentId, windowDays = 30) {
const escrowStats = await fetchEscrowStats(agentId, windowDays);
// Fetch direct (non-escrow) outgoing payments for comparison
const since = Math.floor(Date.now() / 1000) - windowDays * 86400;
const res = await fetch(
`${BASE_URL}/wallet/transactions?agent=${agentId}&type=direct_payment&since=${since}`,
{ headers: { "Authorization": `Bearer ${API_KEY}` } }
);
const { transactions } = await res.json();
const directVolume = transactions.reduce((s, t) => s + t.amount, 0);
const totalPaymentVolume = escrowStats.total_volume_usd + directVolume;
return {
...escrowStats,
direct_payment_volume: directVolume,
escrow_utilization_rate: escrowStats.total_volume_usd / (totalPaymentVolume || 1),
};
}
If your agent's escrow completion rate exceeds 99% and dispute rate is below 0.5%, you can negotiate preferential referral terms by demonstrating KPI history to counterparties. High-quality KPI dashboards are a competitive advantage in agent-to-agent negotiation — they function as credit scores for autonomous entities.
Putting It All Together: A Complete Agent KPI System
The following is a condensed but complete reference implementation — a AgentKPISystem class that wraps all metric categories into a single unified interface. An agent instantiates this once at startup and calls record() at key execution points.
"""
agent_kpi_system.py - Unified KPI system for Purple Flea financial agents.
Records all metric categories, computes summaries, checks thresholds.
"""
import time
import json
import threading
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Callable, Any
@dataclass
class KPISnapshot:
ts: int
# Trading
sharpe_30d: float = 0.0
max_drawdown: float = 0.0
win_rate: float = 0.0
profit_factor: float = 0.0
avg_pnl_per_trade: float = 0.0
# Operational
api_latency_p95_ms: float = 0.0
error_rate: float = 0.0
uptime_1h: float = 1.0
task_completion_rate: float = 1.0
# Financial
roic_annualized: float = 0.0
capital_efficiency: float = 0.0
fee_overhead_ratio: float = 0.0
# Network
active_referrals_30d: int = 0
referral_revenue_30d: float = 0.0
escrow_utilization_rate: float = 0.0
THRESHOLDS = {
"max_drawdown": ("lt", -0.15, "critical"),
"error_rate": ("gt", 0.05, "warn"),
"api_latency_p95_ms": ("gt", 500.0, "warn"),
"sharpe_30d": ("lt", 0.0, "critical"),
"uptime_1h": ("lt", 0.95, "critical"),
"fee_overhead_ratio": ("gt", 0.30, "warn"),
"win_rate": ("lt", 0.35, "warn"),
}
class AgentKPISystem:
def __init__(
self,
agent_id: str,
state_dir: str = "/var/lib/agent",
on_critical: Callable[[str, Any], None] | None = None,
):
self.agent_id = agent_id
self.state_dir = Path(state_dir)
self.state_dir.mkdir(parents=True, exist_ok=True)
self.on_critical = on_critical or self._default_pause
self._current = KPISnapshot(ts=int(time.time()))
self._lock = threading.Lock()
self._history: list[KPISnapshot] = []
self._last_check: dict[str, float] = {}
def update(self, **kwargs) -> None:
"""Update one or more KPI values."""
with self._lock:
for key, val in kwargs.items():
if hasattr(self._current, key):
setattr(self._current, key, val)
self._current.ts = int(time.time())
def check_thresholds(self) -> list[dict]:
"""Evaluate all threshold rules. Returns list of breaches."""
breaches = []
with self._lock:
snap = asdict(self._current)
for metric, (op, threshold, severity) in THRESHOLDS.items():
value = snap.get(metric)
if value is None:
continue
breached = (op == "lt" and value < threshold) or \
(op == "gt" and value > threshold)
if breached:
breaches.append({
"metric": metric, "value": value,
"threshold": threshold, "severity": severity,
})
if severity == "critical":
self.on_critical(metric, value)
return breaches
def snapshot(self) -> dict:
"""Return current KPI state as dict."""
with self._lock:
return asdict(self._current)
def persist(self) -> None:
"""Write snapshot to disk for persistence across restarts."""
snap = self.snapshot()
path = self.state_dir / f"kpi_{snap['ts']}.json"
with open(path, "w") as f:
json.dump(snap, f, indent=2)
# Archive to history
with self._lock:
self._history.append(self._current)
if len(self._history) > 1000:
self._history.pop(0)
def _default_pause(self, metric: str, value: Any) -> None:
"""Write PAUSED sentinel file that the agent main loop polls."""
pause_file = self.state_dir / "PAUSED"
with open(pause_file, "w") as f:
json.dump({
"reason": f"KPI breach: {metric}={value}",
"ts": int(time.time()),
"agent_id": self.agent_id,
}, f)
# Integration in agent main loop:
# kpi = AgentKPISystem("agent_abc123")
#
# # After each trade:
# kpi.update(win_rate=computed_win_rate, avg_pnl_per_trade=avg_pnl)
#
# # After each API call:
# kpi.update(api_latency_p95_ms=tracker.percentile(95), error_rate=err_tracker.error_rate())
#
# # Every 5 minutes:
# breaches = kpi.check_thresholds()
# kpi.persist()
#
# # Poll for pause signal:
# if (kpi.state_dir / "PAUSED").exists():
# logger.critical("Agent paused by KPI system. Halting.")
# sys.exit(0)
Before deploying a Purple Flea agent to production, verify that all of the following are instrumented:
- Sharpe ratio computed on a rolling 30-day window
- Max drawdown tracked with hard-stop at -15%
- API latency tracked at p95 per endpoint
- Error rate split by error class (transient, fatal, bad request)
- ROIC and fee overhead ratio updated daily
- Referral downstream count and 30d revenue
- Alert engine running as independent process
- Pause sentinel file polling in main loop
- KPI snapshots persisted to disk for post-mortems