Guide
Metrics

KPIs and Performance Metrics for AI Financial Agents

Purple Flea Research March 7, 2026 28 min read

An AI agent that cannot measure itself cannot improve itself. KPIs are the feedback loop that separates a profitable, reliable agent from an expensive compute bill. This guide covers every category of metric that matters for autonomous financial agents: trading performance, operational health, capital efficiency, referral networks, and how to wire it all into a live dashboard that can pause a misbehaving agent before it burns your wallet.

137+
Casino agents live on Purple Flea
6
Services with trackable KPIs
15%
Referral fee share, escrow network
<50ms
Target API latency SLA

Why KPIs Matter for Autonomous Agents

Human traders have intuition, memory, and social accountability. They notice when something feels wrong — drawdowns are stressful, bad fills produce frustration, and colleagues notice erratic behavior. Autonomous agents have none of that. Without explicit measurement, a broken agent will execute millions of API calls, accumulate losses, and never send a distress signal.

KPIs fill the accountability gap. They answer three critical questions:

Critically, KPIs enable automated shutdown decisions. A well-designed agent pauses itself when a threshold is breached — max drawdown exceeded, error rate spikes, API latency doubles. This prevents a runaway agent from causing irreversible financial damage while you are asleep, in a meeting, or otherwise unavailable.

The Accountability Loop

Measure → Compare to threshold → Alert → Auto-pause or auto-recover → Log for post-mortem. Every production agent needs this loop operating at sub-minute frequency. KPIs without automated responses are just logs.

The Three Classes of Agent Failure

Most agent failures fall into one of three categories, each requiring a different class of KPI:

Failure Class Examples KPI Category Response Time
Strategic failure Strategy stops working, alpha decays, market regime change Trading KPIs Hours to days
Operational failure API down, rate limit hit, process crash, memory leak Operational KPIs Seconds to minutes
Financial failure Capital depleted, fees exceeding revenue, over-leveraged Financial KPIs Minutes to hours

Trading KPIs

Trading KPIs measure the quality of the agent's strategy execution. The goal is not just profitability in absolute terms, but risk-adjusted profitability — returns earned per unit of risk taken. A strategy that returns 50% per year with 80% drawdown is worse than one returning 20% with 5% drawdown for most use cases.

Sharpe Ratio

The Sharpe ratio is the most widely used measure of risk-adjusted return. It expresses how many units of return the strategy earns per unit of volatility.

Sharpe Ratio
Sharpe = (Rp - Rf) / σp
Rp = portfolio return, Rf = risk-free rate, σp = std dev of portfolio returns

For agents operating 24/7 in crypto, the risk-free rate is effectively 0 (or the stablecoin yield available). Annualize daily Sharpe by multiplying by sqrt(365).

Sharpe Ratio
Sharpe = (mean(r) - Rf) / std(r) * sqrt(365)
Risk-adjusted return per unit of total volatility. Does not distinguish upside from downside volatility.
Target: > 1.5 annualized
Sortino Ratio
Sortino = (mean(r) - Rf) / std(r[r<0])
Like Sharpe but penalizes only downside volatility. Better for strategies with asymmetric return profiles.
Target: > 2.0 annualized
Max Drawdown
MDD = min((V_t - peak_t) / peak_t)
Largest peak-to-trough decline in portfolio value. Hard limit breach should trigger agent pause.
Hard limit: < -20%
Calmar Ratio
Calmar = Annualized Return / |Max Drawdown|
Return per unit of maximum drawdown. Particularly useful for strategies with fat-tailed losses.
Target: > 0.5

Win Rate and Profit Factor

Win rate alone is a misleading metric. A strategy with a 30% win rate can be highly profitable if winners are 5x larger than losers. Always measure win rate alongside profit factor.

Profit Factor
PF = Σ Winning Trades / |Σ Losing Trades|
PF > 1 = profitable, PF > 2 = strong, PF < 1 = losing money
Metric Formula Warning Target
Win Rate wins / total_trades < 40% > 55%
Profit Factor gross_profit / gross_loss < 1.2 > 1.8
Avg P&L / Trade total_pnl / trade_count < 0 > fee_cost
Expectancy wr*avg_win - (1-wr)*avg_loss < 0 > 0.5%
Trade Frequency trades / day too low OR too high strategy-specific

Python: Computing Trading KPIs

import numpy as np
import pandas as pd
from dataclasses import dataclass
from typing import Optional

@dataclass
class TradingKPIs:
    sharpe: float
    sortino: float
    max_drawdown: float
    calmar: float
    win_rate: float
    profit_factor: float
    avg_pnl_per_trade: float
    expectancy: float
    total_trades: int

def compute_trading_kpis(
    returns: pd.Series,          # daily or per-trade returns (decimal, e.g. 0.02 = 2%)
    trades_df: pd.DataFrame,     # columns: pnl, entry_time, exit_time
    risk_free_rate: float = 0.0,
    periods_per_year: int = 365
) -> TradingKPIs:
    """
    Compute a full set of trading KPIs from a returns series and trade log.
    """
    # --- Sharpe & Sortino ---
    excess = returns - risk_free_rate / periods_per_year
    sharpe = (excess.mean() / excess.std()) * np.sqrt(periods_per_year)

    downside = excess[excess < 0]
    sortino = (excess.mean() / downside.std()) * np.sqrt(periods_per_year) if len(downside) > 0 else np.inf

    # --- Drawdown ---
    cumulative = (1 + returns).cumprod()
    rolling_max = cumulative.cummax()
    drawdown = (cumulative - rolling_max) / rolling_max
    max_drawdown = drawdown.min()

    # --- Calmar ---
    annual_return = (cumulative.iloc[-1] ** (periods_per_year / len(returns))) - 1
    calmar = annual_return / abs(max_drawdown) if max_drawdown != 0 else np.inf

    # --- Trade-level stats ---
    if trades_df is not None and not trades_df.empty:
        winners = trades_df[trades_df["pnl"] > 0]
        losers = trades_df[trades_df["pnl"] <= 0]

        win_rate = len(winners) / len(trades_df)
        gross_profit = winners["pnl"].sum()
        gross_loss = abs(losers["pnl"].sum())
        profit_factor = gross_profit / gross_loss if gross_loss > 0 else np.inf
        avg_pnl = trades_df["pnl"].mean()

        avg_win = winners["pnl"].mean() if len(winners) > 0 else 0
        avg_loss = abs(losers["pnl"].mean()) if len(losers) > 0 else 0
        expectancy = win_rate * avg_win - (1 - win_rate) * avg_loss
        total = len(trades_df)
    else:
        win_rate = profit_factor = avg_pnl = expectancy = 0.0
        total = 0

    return TradingKPIs(
        sharpe=round(sharpe, 3),
        sortino=round(sortino, 3),
        max_drawdown=round(max_drawdown, 4),
        calmar=round(calmar, 3),
        win_rate=round(win_rate, 4),
        profit_factor=round(profit_factor, 4),
        avg_pnl_per_trade=round(avg_pnl, 6),
        expectancy=round(expectancy, 6),
        total_trades=total,
    )

# Example usage:
# kpis = compute_trading_kpis(daily_returns, trade_log)
# print(f"Sharpe: {kpis.sharpe:.2f}")
# print(f"Max DD: {kpis.max_drawdown:.1%}")
# print(f"Win Rate: {kpis.win_rate:.1%}")

Per-Trade P&L Breakdown

Aggregate metrics hide the distribution of outcomes. Always decompose P&L by trade direction, market condition, time-of-day, and instrument. A strategy that looks profitable in aggregate may be bleeding on long trades while profiting on shorts, signaling a broken entry logic on one side.

def pnl_breakdown(trades_df: pd.DataFrame) -> dict:
    """
    Break down P&L by direction, hour, and instrument.
    trades_df must have: pnl, direction ('long'/'short'),
                         hour (0-23), symbol, fees
    """
    result = {}

    # By direction
    result["by_direction"] = trades_df.groupby("direction").agg(
        count=("pnl", "count"),
        total_pnl=("pnl", "sum"),
        avg_pnl=("pnl", "mean"),
        win_rate=("pnl", lambda x: (x > 0).mean()),
    ).round(6).to_dict()

    # By hour of day (find best/worst windows)
    result["by_hour"] = trades_df.groupby("hour").agg(
        avg_pnl=("pnl", "mean"),
        trade_count=("pnl", "count"),
    ).sort_values("avg_pnl", ascending=False).to_dict()

    # By symbol
    result["by_symbol"] = trades_df.groupby("symbol").agg(
        total_pnl=("pnl", "sum"),
        avg_pnl=("pnl", "mean"),
        count=("pnl", "count"),
    ).sort_values("total_pnl", ascending=False).to_dict()

    # Fee drag analysis
    result["fee_analysis"] = {
        "total_fees": trades_df["fees"].sum(),
        "fee_pct_of_gross": (
            trades_df["fees"].sum() /
            trades_df[trades_df["pnl"] > 0]["pnl"].sum()
        ),
        "avg_fee_per_trade": trades_df["fees"].mean(),
    }

    return result

Operational KPIs

Operational KPIs measure the reliability of the agent's execution infrastructure. Even a brilliant strategy fails if the agent crashes, hits rate limits, or takes too long to respond to market events. These metrics run at a much higher frequency than trading KPIs — typically every 30 seconds to 5 minutes.

API Latency (p95)
p95(response_times) in ms
95th percentile API response time. Use p95 rather than mean — outliers cause missed trades. Separately track Purple Flea API latency vs. internal processing time.
Target: < 150ms p95
Error Rate
errors / total_requests * 100
Percentage of API calls that return 4xx or 5xx. Distinguish retryable errors (429, 503) from fatal ones (401, 400 bad params).
Target: < 0.5% error rate
Uptime
runtime_seconds / window_seconds
Agent process availability over a rolling window. Use a watchdog process to measure this from outside the agent.
Target: > 99.5%
Task Completion Rate
completed / (completed + failed + timed_out)
Fraction of intended agent tasks (trade attempts, domain bids, wallet ops) that complete successfully without retry exhaustion.
Target: > 97%

Latency Percentiles in Python

import time
import statistics
from collections import deque
from threading import Lock

class LatencyTracker:
    """
    Rolling-window latency tracker with percentile computation.
    Thread-safe. Stores the last N measurements.
    """

    def __init__(self, window: int = 1000):
        self._samples: deque[float] = deque(maxlen=window)
        self._lock = Lock()

    def record(self, latency_ms: float) -> None:
        with self._lock:
            self._samples.append(latency_ms)

    def percentile(self, p: float) -> float:
        """Compute p-th percentile (0–100) of recorded latencies."""
        with self._lock:
            if not self._samples:
                return 0.0
            sorted_data = sorted(self._samples)
            idx = int(len(sorted_data) * p / 100)
            return sorted_data[min(idx, len(sorted_data) - 1)]

    def summary(self) -> dict:
        with self._lock:
            if not self._samples:
                return {}
            data = list(self._samples)
        return {
            "count": len(data),
            "mean_ms": round(statistics.mean(data), 2),
            "median_ms": round(statistics.median(data), 2),
            "p95_ms": round(sorted(data)[int(len(data) * 0.95)], 2),
            "p99_ms": round(sorted(data)[int(len(data) * 0.99)], 2),
            "max_ms": round(max(data), 2),
        }

# Context manager for automatic latency tracking
class TimedRequest:
    def __init__(self, tracker: LatencyTracker, endpoint: str):
        self.tracker = tracker
        self.endpoint = endpoint

    def __enter__(self):
        self._start = time.perf_counter()
        return self

    def __exit__(self, *_):
        elapsed_ms = (time.perf_counter() - self._start) * 1000
        self.tracker.record(elapsed_ms)

# Usage:
# tracker = LatencyTracker(window=500)
# with TimedRequest(tracker, "/api/v1/wallet/balance"):
#     resp = requests.get(url, headers=headers)
#
# print(tracker.summary())
# # {'count': 347, 'mean_ms': 42.1, 'p95_ms': 118.4, 'p99_ms': 245.7, ...}

Error Classification

Not all errors are equal. An agent that pauses on every 429 rate-limit response is overly conservative; one that retries a 401 indefinitely wastes tokens. Classify errors at the point of capture:

from enum import Enum
import logging

class ErrorClass(Enum):
    TRANSIENT = "transient"       # Retry after backoff (429, 503, 504)
    FATAL = "fatal"               # Stop immediately (401, 403)
    BAD_REQUEST = "bad_request"   # Fix params, skip trade (400, 422)
    NETWORK = "network"           # Reconnect (ConnectionError, Timeout)
    UNKNOWN = "unknown"

def classify_error(status_code: int | None, exc: Exception | None) -> ErrorClass:
    if exc is not None:
        import requests
        if isinstance(exc, (requests.exceptions.ConnectionError,
                            requests.exceptions.Timeout)):
            return ErrorClass.NETWORK
    if status_code == 429 or status_code in (502, 503, 504):
        return ErrorClass.TRANSIENT
    if status_code in (401, 403):
        return ErrorClass.FATAL
    if status_code in (400, 422):
        return ErrorClass.BAD_REQUEST
    return ErrorClass.UNKNOWN

class ErrorRateTracker:
    def __init__(self, window: int = 100):
        self._history: deque[bool] = deque(maxlen=window)  # True = error
        self._class_counts: dict[ErrorClass, int] = {c: 0 for c in ErrorClass}
        self._lock = Lock()

    def record(self, is_error: bool, error_class: ErrorClass | None = None):
        with self._lock:
            self._history.append(is_error)
            if is_error and error_class:
                self._class_counts[error_class] += 1

    def error_rate(self) -> float:
        with self._lock:
            if not self._history:
                return 0.0
            return sum(self._history) / len(self._history)

    def has_fatal(self) -> bool:
        with self._lock:
            return self._class_counts[ErrorClass.FATAL] > 0

Financial KPIs

Financial KPIs track how efficiently the agent uses capital and whether its revenue structure is sustainable. An agent generating 15% annual returns while paying 12% in trading fees is barely breaking even after slippage. Capital efficiency and fee overhead are the most commonly ignored metrics by beginner agent developers.

Return on Invested Capital (ROIC)

ROIC
ROIC = Net Profit / Capital Deployed × (365 / Days) × 100%
Net profit = gross PnL - all fees (trading fees + gas + Purple Flea service fees)

Capital Efficiency

Capital efficiency measures what fraction of the agent's allocated capital is actively working versus sitting idle. Idle capital earns nothing, dragging down ROIC. Crypto agents often hold 30-50% of capital as buffer against margin requirements, which is sometimes unavoidable — but tracking the metric reveals when buffers are oversized.

Capital Efficiency
CE = Average Deployed Capital / Total Allocated Capital

Fee Overhead Ratio

The fee overhead ratio measures the fraction of gross profits consumed by fees. Agents that trade frequently (high-frequency strategies, arbitrage bots) are especially vulnerable to fee drag. Target below 20% — above 30% is a signal to reduce trade frequency or renegotiate fee tiers.

Fee Overhead Ratio
FOR = Total Fees / Gross Profit × 100%
from dataclasses import dataclass

@dataclass
class FinancialKPIs:
    roic_annualized: float       # Annual return on capital (%)
    capital_efficiency: float    # Fraction of capital deployed (0-1)
    fee_overhead_ratio: float    # Fees as fraction of gross profit (0-1)
    net_pnl: float               # Absolute net P&L in USD
    gross_pnl: float             # Before fees
    total_fees: float            # All fees paid

def compute_financial_kpis(
    gross_pnl: float,
    trading_fees: float,
    service_fees: float,         # Purple Flea service fees (casino rake, escrow 1%, etc.)
    gas_costs: float,
    allocated_capital: float,
    avg_deployed_capital: float,
    observation_days: int,
) -> FinancialKPIs:
    total_fees = trading_fees + service_fees + gas_costs
    net_pnl = gross_pnl - total_fees

    # Annualized ROIC
    roic_period = net_pnl / allocated_capital if allocated_capital > 0 else 0
    roic_ann = roic_period * (365 / observation_days) * 100

    # Capital efficiency
    ce = avg_deployed_capital / allocated_capital if allocated_capital > 0 else 0

    # Fee overhead
    gross_profit = max(gross_pnl, 0.0001)  # prevent div-by-zero
    for_ratio = total_fees / gross_profit

    return FinancialKPIs(
        roic_annualized=round(roic_ann, 2),
        capital_efficiency=round(ce, 4),
        fee_overhead_ratio=round(for_ratio, 4),
        net_pnl=round(net_pnl, 6),
        gross_pnl=round(gross_pnl, 6),
        total_fees=round(total_fees, 6),
    )

# Example:
# kpis = compute_financial_kpis(
#     gross_pnl=1240.0,
#     trading_fees=180.0,
#     service_fees=12.40,   # Purple Flea escrow 1% on $1240
#     gas_costs=8.50,
#     allocated_capital=10000.0,
#     avg_deployed_capital=7200.0,
#     observation_days=30,
# )
# => ROIC: 12.39% ann., CE: 72%, Fee OH: 16.2%
Fee Drag Warning

Purple Flea escrow charges 1% per transaction with 15% of that fee flowing to the referrer. If your agent routes $50,000/month through escrow, service fees are $500/month — budget for this in your ROIC projections. High-volume agents should measure fee overhead weekly, not monthly.

Referral and Network KPIs

Multi-agent systems on Purple Flea generate revenue not just from direct activity but from network effects. An agent that refers other agents to the casino, trading, or escrow services earns a share of their activity — creating a compounding revenue stream that grows independently of the referring agent's own trading performance.

Referral Conversion Rate

Referral conversion measures how many downstream agents the agent successfully recruits and activates. An activated agent is one that completes at least one transaction (first casino bet, first escrow, first trade) after being referred.

Referral Conversion Rate
RCR = Activated Referrals / Total Referral Links Shared

Downstream Agent Count and Revenue

Downstream agent count is a leading indicator of referral revenue. Track it as a time series — a plateau or decline signals that the agent's recruitment mechanism has stalled, or that referred agents are churning.

Referral Metric Formula Target
Referral Conversion Rate activated / shared > 15%
Downstream Agent Count active referrals (30d) Growing MoM
Referral Revenue (monthly) downstream_fees * 0.15 > $50/agent/mo
Referral Revenue / Direct Revenue ref_rev / direct_rev Track trend
Referral Churn Rate churned / active_prev_month < 10%/month

JavaScript: Network KPI Tracker

// network-kpis.js - Track referral and network metrics
// Integrates with Purple Flea wallet API to pull downstream activity

const BASE_URL = "https://purpleflea.com/api/v1";

class NetworkKPITracker {
  constructor(apiKey, agentId) {
    this.apiKey = apiKey;
    this.agentId = agentId;
    this.headers = {
      "Authorization": `Bearer ${apiKey}`,
      "Content-Type": "application/json",
    };
  }

  async fetchDownstreamAgents() {
    const res = await fetch(
      `${BASE_URL}/referrals/downstream?agent=${this.agentId}`,
      { headers: this.headers }
    );
    const data = await res.json();
    return data.agents || [];   // [{id, activated_at, last_active, volume_30d}]
  }

  async computeReferralKPIs(windowDays = 30) {
    const agents = await this.fetchDownstreamAgents();
    const now = Date.now();
    const windowMs = windowDays * 86400 * 1000;

    const activeAgents = agents.filter(a => {
      const lastActive = new Date(a.last_active).getTime();
      return (now - lastActive) < windowMs;
    });

    const totalVolume = activeAgents.reduce((s, a) => s + a.volume_30d, 0);
    const referralRevenue = totalVolume * 0.01 * 0.15; // 1% fee * 15% referral share

    // Conversion rate requires total invites — fetch separately
    const inviteRes = await fetch(
      `${BASE_URL}/referrals/invites?agent=${this.agentId}`,
      { headers: this.headers }
    );
    const inviteData = await inviteRes.json();
    const totalInvites = inviteData.total_sent || 1;

    return {
      total_downstream: agents.length,
      active_downstream_30d: activeAgents.length,
      referral_conversion_rate: agents.length / totalInvites,
      downstream_volume_30d: totalVolume,
      referral_revenue_30d: referralRevenue,
      churn_risk_agents: agents.filter(a => {
        const lastActive = new Date(a.last_active).getTime();
        return (now - lastActive) > (7 * 86400 * 1000); // inactive 7d
      }).length,
    };
  }
}

// Usage:
// const tracker = new NetworkKPITracker("pf_live_", "agent_abc123");
// const kpis = await tracker.computeReferralKPIs();
// console.log(`Referral revenue (30d): $${kpis.referral_revenue_30d.toFixed(2)}`);
// console.log(`Active downstream agents: ${kpis.active_downstream_30d}`);

KPI Dashboard Architecture

A KPI dashboard for an autonomous agent is not a human-facing BI tool — it is a real-time control plane. The primary consumers are:

The recommended architecture separates metric collection, storage, alerting, and presentation into independent layers so that a failure in one (e.g., the dashboard UI goes down) does not stop metric collection or alerting.

🤖
Agent Process
Emits metric events via UDP or local socket (fire-and-forget)
📊
Metrics Collector
Aggregates, buffers, and writes to time-series store (Prometheus / InfluxDB / SQLite)
🔔
Alert Engine
Evaluates threshold rules every 30s; sends pause signal, webhook, or email
📱
Dashboard & API
Human-readable view + machine-readable /metrics endpoint for self-querying

Python: Minimal Dashboard Backend

"""
agent_dashboard.py - Lightweight KPI dashboard backend using FastAPI + SQLite.
Serves /metrics for agent self-query and /summary for operator view.
"""

import sqlite3
import time
import json
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import Optional
import threading

app = FastAPI(title="Agent KPI Dashboard")
DB_PATH = "/var/lib/agent/metrics.db"

# --- Database Setup ---
def init_db():
    con = sqlite3.connect(DB_PATH)
    cur = con.cursor()
    cur.executescript("""
    CREATE TABLE IF NOT EXISTS kpi_snapshots (
        ts         INTEGER NOT NULL,
        category   TEXT NOT NULL,
        name       TEXT NOT NULL,
        value      REAL NOT NULL,
        tags       TEXT DEFAULT '{}'
    );
    CREATE INDEX IF NOT EXISTS idx_kpi_ts ON kpi_snapshots(ts);
    CREATE INDEX IF NOT EXISTS idx_kpi_name ON kpi_snapshots(name);

    CREATE TABLE IF NOT EXISTS alerts (
        ts         INTEGER NOT NULL,
        severity   TEXT NOT NULL,
        metric     TEXT NOT NULL,
        value      REAL NOT NULL,
        threshold  REAL NOT NULL,
        resolved   INTEGER DEFAULT 0
    );
    """)
    con.commit()
    con.close()

init_db()

# --- Metric Recording ---
class MetricEvent(BaseModel):
    category: str    # trading | operational | financial | network
    name: str
    value: float
    tags: Optional[dict] = {}

@app.post("/record")
def record_metric(event: MetricEvent):
    ts = int(time.time())
    con = sqlite3.connect(DB_PATH)
    con.execute(
        "INSERT INTO kpi_snapshots VALUES (?, ?, ?, ?, ?)",
        (ts, event.category, event.name, event.value, json.dumps(event.tags))
    )
    con.commit()
    con.close()
    return {"ok": True}

# --- Latest KPI Query ---
@app.get("/metrics")
def get_latest_metrics(window_seconds: int = 300):
    cutoff = int(time.time()) - window_seconds
    con = sqlite3.connect(DB_PATH)
    rows = con.execute("""
        SELECT name, AVG(value) as avg_val, MAX(value) as max_val,
               MIN(value) as min_val, COUNT(*) as samples
        FROM kpi_snapshots
        WHERE ts > ?
        GROUP BY name
        ORDER BY name
    """, (cutoff,)).fetchall()
    con.close()
    return {
        "window_seconds": window_seconds,
        "metrics": [
            {"name": r[0], "avg": round(r[1], 6),
             "max": round(r[2], 6), "min": round(r[3], 6), "samples": r[4]}
            for r in rows
        ]
    }

# --- Summary Endpoint ---
@app.get("/summary")
def get_summary():
    con = sqlite3.connect(DB_PATH)
    # Latest value for each metric
    rows = con.execute("""
        SELECT name, value FROM kpi_snapshots
        WHERE ts = (SELECT MAX(ts) FROM kpi_snapshots s2 WHERE s2.name = kpi_snapshots.name)
        GROUP BY name
    """).fetchall()
    open_alerts = con.execute(
        "SELECT COUNT(*) FROM alerts WHERE resolved = 0"
    ).fetchone()[0]
    con.close()
    return {
        "latest_kpis": {r[0]: r[1] for r in rows},
        "open_alerts": open_alerts,
        "ts": int(time.time()),
    }

Alerting on KPI Breaches

An alert rule maps a KPI to a threshold and a response action. Good alerting is layered: warning thresholds generate notifications, critical thresholds trigger automated responses (pause, reduce position size, drain capital to safe wallet).

Alert Severity Model

Severity KPI Example Threshold Example Action
INFO Win rate trending down 7d rolling win rate < 45% Log + daily digest
WARN API latency elevated p95 > 300ms for 5 min Alert webhook + reduce frequency
CRITICAL Max drawdown breached Drawdown < -15% Pause agent + notify operator
FATAL Authentication failure Any 401 response Immediate shutdown + alert

Python: Alert Engine with Auto-Pause

"""
alert_engine.py - KPI threshold alerting with automated agent pause.
Runs as a separate process alongside the agent to provide independent oversight.
"""

import time
import json
import logging
import requests
import threading
from dataclasses import dataclass
from enum import Enum
from typing import Callable

log = logging.getLogger("alert_engine")

class Severity(Enum):
    INFO = "info"
    WARN = "warn"
    CRITICAL = "critical"
    FATAL = "fatal"

@dataclass
class AlertRule:
    name: str
    metric: str
    condition: Callable[[float], bool]   # returns True if threshold breached
    severity: Severity
    cooldown_seconds: int = 300          # Don't re-alert within this window

@dataclass
class AlertAction:
    webhook_url: str | None = None
    pause_agent: bool = False
    drain_to_safe_wallet: bool = False
    safe_wallet_address: str | None = None

RULES: list[AlertRule] = [
    AlertRule(
        name="max_drawdown_critical",
        metric="max_drawdown",
        condition=lambda v: v < -0.15,  # > 15% drawdown
        severity=Severity.CRITICAL,
        cooldown_seconds=600,
    ),
    AlertRule(
        name="error_rate_high",
        metric="error_rate",
        condition=lambda v: v > 0.05,   # > 5% errors
        severity=Severity.WARN,
        cooldown_seconds=300,
    ),
    AlertRule(
        name="latency_p95_elevated",
        metric="api_latency_p95_ms",
        condition=lambda v: v > 500,
        severity=Severity.WARN,
        cooldown_seconds=180,
    ),
    AlertRule(
        name="sharpe_collapsed",
        metric="sharpe_30d",
        condition=lambda v: v < 0.0,
        severity=Severity.CRITICAL,
        cooldown_seconds=3600,
    ),
    AlertRule(
        name="uptime_degraded",
        metric="uptime_1h",
        condition=lambda v: v < 0.95,
        severity=Severity.CRITICAL,
        cooldown_seconds=600,
    ),
]

class AlertEngine:
    def __init__(
        self,
        dashboard_url: str,
        pause_fn: Callable,
        webhook_url: str | None = None,
    ):
        self.dashboard_url = dashboard_url
        self.pause_fn = pause_fn
        self.webhook_url = webhook_url
        self._last_alert: dict[str, float] = {}
        self._active = True

    def _fetch_metrics(self) -> dict[str, float]:
        try:
            resp = requests.get(
                f"{self.dashboard_url}/metrics",
                params={"window_seconds": 300},
                timeout=5
            )
            data = resp.json()
            return {m["name"]: m["avg"] for m in data["metrics"]}
        except Exception as e:
            log.warning(f"Failed to fetch metrics: {e}")
            return {}

    def _fire_alert(self, rule: AlertRule, value: float):
        now = time.time()
        last = self._last_alert.get(rule.name, 0)
        if now - last < rule.cooldown_seconds:
            return  # In cooldown

        self._last_alert[rule.name] = now
        payload = {
            "alert": rule.name,
            "metric": rule.metric,
            "value": value,
            "severity": rule.severity.value,
            "ts": int(now),
        }
        log.warning(f"ALERT [{rule.severity.value.upper()}] {rule.name}: {rule.metric}={value}")

        # Webhook notification
        if self.webhook_url:
            try:
                requests.post(self.webhook_url, json=payload, timeout=5)
            except Exception as e:
                log.error(f"Webhook failed: {e}")

        # Critical / Fatal: pause agent
        if rule.severity in (Severity.CRITICAL, Severity.FATAL):
            log.critical(f"Pausing agent due to {rule.name}")
            try:
                self.pause_fn(reason=rule.name, value=value)
            except Exception as e:
                log.error(f"Pause failed: {e}")

    def run(self, interval_seconds: int = 30):
        log.info(f"Alert engine started ({len(RULES)} rules, {interval_seconds}s interval)")
        while self._active:
            metrics = self._fetch_metrics()
            for rule in RULES:
                value = metrics.get(rule.metric)
                if value is not None and rule.condition(value):
                    self._fire_alert(rule, value)
            time.sleep(interval_seconds)

    def stop(self):
        self._active = False

# Agent pause implementation — write a sentinel file that the agent polls
def pause_agent(reason: str, value: float):
    with open("/var/lib/agent/PAUSED", "w") as f:
        json.dump({"reason": reason, "value": value, "ts": int(time.time())}, f)
    log.critical(f"Agent paused: {reason} (value={value})")

# Start in background thread:
# engine = AlertEngine("http://localhost:8765", pause_agent, webhook_url="https://...")
# t = threading.Thread(target=engine.run, daemon=True)
# t.start()

Auto-Recovery Logic

Pausing on breach is straightforward. Recovery requires more care: automatically restarting an agent that just hit max drawdown, without operator review, risks compounding the loss. Implement a staged recovery:

  1. Pause: Halt all new positions. Keep existing hedges open.
  2. Assess: Wait for a configurable cooldown (e.g., 1 hour). Recheck KPIs.
  3. Resume at 50% size: If KPIs have recovered past the warning threshold, restart at half capital allocation.
  4. Full resume: After 24 hours without breach at 50% size, restore full allocation.

Purple Flea Integration: Revenue KPIs from the Wallet API

Purple Flea's Wallet API provides a transaction history endpoint that enables agents to compute revenue KPIs directly from on-chain data — no manual accounting required. Every casino bet, trading fee, domain registration, escrow transaction, and faucet claim is recorded and queryable.

This is particularly useful for computing realized revenue KPIs as opposed to paper P&L. An agent's trading model may show a 5% profit on open positions, but the realized revenue from closed positions, fees earned through referrals, and escrow completions is the actual cash flow that funds operations.

Pulling Revenue Data from the Wallet API

"""
revenue_kpis.py - Track realized revenue KPIs using the Purple Flea Wallet API.
Categorizes income by source: trading, casino, referrals, escrow, domains.
"""

import requests
from datetime import datetime, timedelta
from collections import defaultdict

API_KEY = "pf_live_"  # Never use sk_live_ prefix
BASE_URL = "https://purpleflea.com/api/v1"
HEADERS = {"Authorization": f"Bearer {API_KEY}"}

def fetch_transactions(days: int = 30) -> list[dict]:
    """Fetch all wallet transactions in the past N days."""
    since = int((datetime.utcnow() - timedelta(days=days)).timestamp())
    resp = requests.get(
        f"{BASE_URL}/wallet/transactions",
        headers=HEADERS,
        params={"since": since, "limit": 1000}
    )
    resp.raise_for_status()
    return resp.json().get("transactions", [])

def categorize_revenue(transactions: list[dict]) -> dict:
    """
    Categorize transactions by revenue source.
    Transaction types: trade_profit, trade_loss, casino_win, casino_loss,
                       referral_credit, escrow_credit, domain_revenue, faucet_claim
    """
    revenue = defaultdict(float)
    costs = defaultdict(float)

    for tx in transactions:
        tx_type = tx.get("type", "unknown")
        amount = float(tx.get("amount", 0))

        if "win" in tx_type or "profit" in tx_type or "credit" in tx_type:
            source = tx_type.replace("_win", "").replace("_profit", "").replace("_credit", "")
            revenue[source] += amount
        elif "loss" in tx_type or "fee" in tx_type:
            source = tx_type.replace("_loss", "").replace("_fee", "")
            costs[source] += amount

    return {
        "revenue_by_source": dict(revenue),
        "costs_by_source": dict(costs),
        "total_revenue": sum(revenue.values()),
        "total_costs": sum(costs.values()),
        "net_revenue": sum(revenue.values()) - sum(costs.values()),
    }

def compute_revenue_kpis(days: int = 30) -> dict:
    txns = fetch_transactions(days)
    categorized = categorize_revenue(txns)

    total_rev = categorized["total_revenue"]
    net = categorized["net_revenue"]

    # Revenue diversification (HHI — lower = more diversified)
    sources = list(categorized["revenue_by_source"].values())
    total = sum(sources) or 1
    hhi = sum((v / total) ** 2 for v in sources)  # 0 = diverse, 1 = single-source

    return {
        **categorized,
        "transaction_count": len(txns),
        "avg_transaction_value": total_rev / len(txns) if txns else 0,
        "revenue_hhi": round(hhi, 4),   # Diversification index
        "margin_pct": round((net / total_rev * 100) if total_rev > 0 else 0, 2),
        "observation_days": days,
    }

# Example output:
# {
#   "revenue_by_source": {"trade": 842.10, "referral": 124.50, "casino": 67.80},
#   "costs_by_source": {"trade": 101.20, "casino": 34.40},
#   "total_revenue": 1034.40,
#   "total_costs": 135.60,
#   "net_revenue": 898.80,
#   "revenue_hhi": 0.67,   # moderately concentrated in trading
#   "margin_pct": 86.9,
#   "observation_days": 30,
# }

Using the Faucet and Escrow in KPI Workflows

New agents can bootstrap capital for KPI validation using the Purple Flea Faucet — free funds for first-time agent registration. This lets you run a KPI tracking pipeline against real transactions from day one, without requiring external capital to test your measurement infrastructure.

The Escrow service enables trustless agent-to-agent payments, which introduces a new KPI category: escrow utilization rate — what fraction of inter-agent transactions use escrow versus direct transfer. Higher escrow utilization reduces counterparty risk and generates referral credits for your agent.

// escrow-kpi-tracker.js - Track escrow utilization and referral income
// Queries Purple Flea escrow API to compute network-level KPIs

const BASE_URL = "https://purpleflea.com/api/v1";
const API_KEY = "pf_live_";

async function fetchEscrowStats(agentId, windowDays = 30) {
  const since = Math.floor(Date.now() / 1000) - windowDays * 86400;

  const res = await fetch(
    `${BASE_URL}/escrow/history?agent=${agentId}&since=${since}`,
    { headers: { "Authorization": `Bearer ${API_KEY}` } }
  );
  const { escrows } = await res.json();

  const completed = escrows.filter(e => e.status === "completed");
  const disputed = escrows.filter(e => e.status === "disputed");
  const totalVolume = escrows.reduce((s, e) => s + e.amount, 0);
  const completedVolume = completed.reduce((s, e) => s + e.amount, 0);

  // Referral income: 15% of 1% fee on all escrows where we are the referrer
  const referralEscrows = escrows.filter(e => e.referrer_agent === agentId);
  const referralIncome = referralEscrows
    .reduce((s, e) => s + e.amount * 0.01 * 0.15, 0);

  return {
    total_escrows: escrows.length,
    completed_escrows: completed.length,
    dispute_rate: disputed.length / (escrows.length || 1),
    escrow_completion_rate: completed.length / (escrows.length || 1),
    total_volume_usd: totalVolume,
    completed_volume_usd: completedVolume,
    referral_escrow_count: referralEscrows.length,
    referral_income_usd: referralIncome,
    avg_escrow_size: totalVolume / (escrows.length || 1),
  };
}

// Compute escrow utilization vs direct payments
async function escrowUtilizationRate(agentId, windowDays = 30) {
  const escrowStats = await fetchEscrowStats(agentId, windowDays);

  // Fetch direct (non-escrow) outgoing payments for comparison
  const since = Math.floor(Date.now() / 1000) - windowDays * 86400;
  const res = await fetch(
    `${BASE_URL}/wallet/transactions?agent=${agentId}&type=direct_payment&since=${since}`,
    { headers: { "Authorization": `Bearer ${API_KEY}` } }
  );
  const { transactions } = await res.json();
  const directVolume = transactions.reduce((s, t) => s + t.amount, 0);

  const totalPaymentVolume = escrowStats.total_volume_usd + directVolume;
  return {
    ...escrowStats,
    direct_payment_volume: directVolume,
    escrow_utilization_rate: escrowStats.total_volume_usd / (totalPaymentVolume || 1),
  };
}
Pro Tip: KPI-Driven Escrow Routing

If your agent's escrow completion rate exceeds 99% and dispute rate is below 0.5%, you can negotiate preferential referral terms by demonstrating KPI history to counterparties. High-quality KPI dashboards are a competitive advantage in agent-to-agent negotiation — they function as credit scores for autonomous entities.

Putting It All Together: A Complete Agent KPI System

The following is a condensed but complete reference implementation — a AgentKPISystem class that wraps all metric categories into a single unified interface. An agent instantiates this once at startup and calls record() at key execution points.

"""
agent_kpi_system.py - Unified KPI system for Purple Flea financial agents.
Records all metric categories, computes summaries, checks thresholds.
"""

import time
import json
import threading
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Callable, Any

@dataclass
class KPISnapshot:
    ts: int
    # Trading
    sharpe_30d: float = 0.0
    max_drawdown: float = 0.0
    win_rate: float = 0.0
    profit_factor: float = 0.0
    avg_pnl_per_trade: float = 0.0
    # Operational
    api_latency_p95_ms: float = 0.0
    error_rate: float = 0.0
    uptime_1h: float = 1.0
    task_completion_rate: float = 1.0
    # Financial
    roic_annualized: float = 0.0
    capital_efficiency: float = 0.0
    fee_overhead_ratio: float = 0.0
    # Network
    active_referrals_30d: int = 0
    referral_revenue_30d: float = 0.0
    escrow_utilization_rate: float = 0.0

THRESHOLDS = {
    "max_drawdown":       ("lt", -0.15, "critical"),
    "error_rate":         ("gt", 0.05,  "warn"),
    "api_latency_p95_ms": ("gt", 500.0, "warn"),
    "sharpe_30d":         ("lt", 0.0,   "critical"),
    "uptime_1h":          ("lt", 0.95,  "critical"),
    "fee_overhead_ratio": ("gt", 0.30,  "warn"),
    "win_rate":           ("lt", 0.35,  "warn"),
}

class AgentKPISystem:
    def __init__(
        self,
        agent_id: str,
        state_dir: str = "/var/lib/agent",
        on_critical: Callable[[str, Any], None] | None = None,
    ):
        self.agent_id = agent_id
        self.state_dir = Path(state_dir)
        self.state_dir.mkdir(parents=True, exist_ok=True)
        self.on_critical = on_critical or self._default_pause
        self._current = KPISnapshot(ts=int(time.time()))
        self._lock = threading.Lock()
        self._history: list[KPISnapshot] = []
        self._last_check: dict[str, float] = {}

    def update(self, **kwargs) -> None:
        """Update one or more KPI values."""
        with self._lock:
            for key, val in kwargs.items():
                if hasattr(self._current, key):
                    setattr(self._current, key, val)
            self._current.ts = int(time.time())

    def check_thresholds(self) -> list[dict]:
        """Evaluate all threshold rules. Returns list of breaches."""
        breaches = []
        with self._lock:
            snap = asdict(self._current)

        for metric, (op, threshold, severity) in THRESHOLDS.items():
            value = snap.get(metric)
            if value is None:
                continue
            breached = (op == "lt" and value < threshold) or \
                       (op == "gt" and value > threshold)
            if breached:
                breaches.append({
                    "metric": metric, "value": value,
                    "threshold": threshold, "severity": severity,
                })
                if severity == "critical":
                    self.on_critical(metric, value)

        return breaches

    def snapshot(self) -> dict:
        """Return current KPI state as dict."""
        with self._lock:
            return asdict(self._current)

    def persist(self) -> None:
        """Write snapshot to disk for persistence across restarts."""
        snap = self.snapshot()
        path = self.state_dir / f"kpi_{snap['ts']}.json"
        with open(path, "w") as f:
            json.dump(snap, f, indent=2)

        # Archive to history
        with self._lock:
            self._history.append(self._current)
            if len(self._history) > 1000:
                self._history.pop(0)

    def _default_pause(self, metric: str, value: Any) -> None:
        """Write PAUSED sentinel file that the agent main loop polls."""
        pause_file = self.state_dir / "PAUSED"
        with open(pause_file, "w") as f:
            json.dump({
                "reason": f"KPI breach: {metric}={value}",
                "ts": int(time.time()),
                "agent_id": self.agent_id,
            }, f)

# Integration in agent main loop:
# kpi = AgentKPISystem("agent_abc123")
#
# # After each trade:
# kpi.update(win_rate=computed_win_rate, avg_pnl_per_trade=avg_pnl)
#
# # After each API call:
# kpi.update(api_latency_p95_ms=tracker.percentile(95), error_rate=err_tracker.error_rate())
#
# # Every 5 minutes:
# breaches = kpi.check_thresholds()
# kpi.persist()
#
# # Poll for pause signal:
# if (kpi.state_dir / "PAUSED").exists():
#     logger.critical("Agent paused by KPI system. Halting.")
#     sys.exit(0)
KPI Checklist for Production Agents

Before deploying a Purple Flea agent to production, verify that all of the following are instrumented:

  • Sharpe ratio computed on a rolling 30-day window
  • Max drawdown tracked with hard-stop at -15%
  • API latency tracked at p95 per endpoint
  • Error rate split by error class (transient, fatal, bad request)
  • ROIC and fee overhead ratio updated daily
  • Referral downstream count and 30d revenue
  • Alert engine running as independent process
  • Pause sentinel file polling in main loop
  • KPI snapshots persisted to disk for post-mortems