Why Elasticsearch for Agent Financial Data?

Agent activity on Purple Flea generates structured event streams at high volume. Elasticsearch gives you the full analytics stack: full-text search, time-series aggregations, machine learning anomaly detection, and real-time dashboards in Kibana.

🔍
Full-Text Trade Search
Search across agent IDs, transaction hashes, escrow counterparties, and error messages with sub-100ms response times on millions of trade events.
📊
P&L Aggregations
Sum, average, and percentile P&L by agent, time window, trade type, and market. Bucket aggregations give you performance attribution at any granularity.
🤖
ML Anomaly Detection
Elastic ML jobs automatically baseline each agent's volume and behavior. Spike alerts fire when an agent's activity deviates from its own historical norm.
📋
EQL Fraud Detection
Event Query Language lets you write multi-step sequence queries to catch wash trading, front-running, and sybil attack patterns across agent escrow histories.
🌏
Kibana Dashboards
Pre-built visualizations for agent leaderboards, referral tree heatmaps, escrow settlement time distributions, and rolling 24h P&L by strategy type.
Real-Time Indexing
Stream Purple Flea trade events directly into Elasticsearch via Logstash or the Python client. Latency under 500ms from trade execution to searchable index.

ES Index for Trade Documents

Design your index mapping carefully up front. Purple Flea trades have a predictable schema: every event has an agent_id, a trade type, an amount, timestamps, and optional referral metadata.

create_index.py Python + Elasticsearch
from elasticsearch import Elasticsearch

es = Elasticsearch(
    "https://your-es-cluster:9200",
    api_key="your_api_key_here",
)

INDEX_NAME = "pf-trades-2026"

MAPPING = {
    "settings": {
        "number_of_shards": 3,
        "number_of_replicas": 1,
        "refresh_interval": "1s",
    },
    "mappings": {
        "properties": {
            # Core identifiers
            "trade_id":        {"type": "keyword"},
            "agent_id":        {"type": "keyword"},
            "counterparty_id": {"type": "keyword"},
            "wallet_address":  {"type": "keyword"},

            # Trade classification
            "trade_type": {
                "type": "keyword",
                # escrow | casino | trade | faucet_claim | referral_payout
            },
            "market_pair":     {"type": "keyword"},
            "strategy_type":   {"type": "keyword"},
            "status":          {"type": "keyword"},

            # Financial amounts (scaled longs for precision)
            "amount_usdc":    {"type": "scaled_float", "scaling_factor": 100},
            "fee_usdc":       {"type": "scaled_float", "scaling_factor": 1000},
            "pnl_usdc":       {"type": "scaled_float", "scaling_factor": 1000},
            "referral_earn":  {"type": "scaled_float", "scaling_factor": 1000},
            "referral_tier":  {"type": "integer"},

            # Timestamps
            "ts":            {"type": "date", "format": "strict_date_optional_time"},
            "settled_at":    {"type": "date", "format": "strict_date_optional_time"},
            "duration_ms":   {"type": "integer"},

            # Full-text searchable fields
            "notes":         {"type": "text", "analyzer": "english"},
            "error_message": {"type": "text"},
            "tags":          {"type": "keyword"},

            # Geo (for future multi-region agent support)
            "region": {"type": "keyword"},
        }
    },
}

es.indices.create(index=INDEX_NAME, body=MAPPING, ignore=400)
print(f"Index {INDEX_NAME} ready.")
Scaling Tip

Use scaled_float with a scaling_factor of 1000 for USDC amounts to avoid floating-point precision loss. This stores amounts as integers internally (multiply by 1000) while presenting as floats in queries. Essential for accurate P&L aggregations.

Index Lifecycle Policy

Purple Flea agents can generate thousands of trades per day. Use ILM (Index Lifecycle Management) to automatically roll over hot indices and move older data to warm or cold tiers:

ilm_policy.json Elasticsearch ILM
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {
            "max_primary_shard_size": "20gb",
            "max_age": "7d"
          },
          "set_priority": { "priority": 100 }
        }
      },
      "warm": {
        "min_age": "7d",
        "actions": {
          "shrink": { "number_of_shards": 1 },
          "forcemerge": { "max_num_segments": 1 },
          "set_priority": { "priority": 50 }
        }
      },
      "cold": {
        "min_age": "30d",
        "actions": {
          "freeze": {},
          "set_priority": { "priority": 0 }
        }
      },
      "delete": {
        "min_age": "365d",
        "actions": { "delete": {} }
      }
    }
  }
}

P&L Aggregations for Agent Performance

Elasticsearch aggregations let you compute P&L, volume, and fee metrics across any combination of agent, time window, and strategy type in a single query.

pnl_aggregations.py Python
from elasticsearch import Elasticsearch
from datetime import datetime, timedelta


class AgentPnLAnalytics:
    """
    P&L analytics for Purple Flea agents using Elasticsearch aggregations.
    """

    def __init__(self, es: Elasticsearch, index: str = "pf-trades-2026"):
        self.es = es
        self.index = index

    def agent_pnl_summary(
        self,
        agent_id: str,
        days: int = 30,
    ) -> dict:
        """Compute total and per-strategy P&L for a single agent."""
        since = (datetime.utcnow() - timedelta(days=days)).isoformat() + "Z"

        resp = self.es.search(
            index=self.index,
            body={
                "query": {
                    "bool": {
                        "filter": [
                            {"term": {"agent_id": agent_id}},
                            {"range": {"ts": {"gte": since}}},
                            {"term": {"status": "settled"}},
                        ]
                    }
                },
                "size": 0,
                "aggs": {
                    "total_pnl": {"sum": {"field": "pnl_usdc"}},
                    "total_volume": {"sum": {"field": "amount_usdc"}},
                    "total_fees_paid": {"sum": {"field": "fee_usdc"}},
                    "referral_earned": {"sum": {"field": "referral_earn"}},
                    "trade_count": {"value_count": {"field": "trade_id"}},
                    "pnl_percentiles": {
                        "percentiles": {
                            "field": "pnl_usdc",
                            "percents": [5, 25, 50, 75, 95],
                        }
                    },
                    "by_strategy": {
                        "terms": {"field": "strategy_type", "size": 10},
                        "aggs": {
                            "strategy_pnl": {"sum": {"field": "pnl_usdc"}},
                            "strategy_volume": {"sum": {"field": "amount_usdc"}},
                        },
                    },
                    "pnl_over_time": {
                        "date_histogram": {
                            "field": "ts",
                            "calendar_interval": "day",
                        },
                        "aggs": {
                            "daily_pnl": {"sum": {"field": "pnl_usdc"}},
                        },
                    },
                },
            },
        )

        aggs = resp["aggregations"]
        return {
            "agent_id":       agent_id,
            "period_days":    days,
            "total_pnl":      aggs["total_pnl"]["value"],
            "total_volume":   aggs["total_volume"]["value"],
            "fees_paid":      aggs["total_fees_paid"]["value"],
            "referral_earned":aggs["referral_earned"]["value"],
            "trade_count":    aggs["trade_count"]["value"],
            "pnl_p50":        aggs["pnl_percentiles"]["values"]["50.0"],
            "pnl_p95":        aggs["pnl_percentiles"]["values"]["95.0"],
            "by_strategy": [
                {
                    "strategy": b["key"],
                    "pnl": b["strategy_pnl"]["value"],
                    "volume": b["strategy_volume"]["value"],
                }
                for b in aggs["by_strategy"]["buckets"]
            ],
        }

    def top_agents_by_pnl(self, n: int = 10, days: int = 7) -> list[dict]:
        """Leaderboard: top n agents by total P&L in the last n days."""
        since = (datetime.utcnow() - timedelta(days=days)).isoformat() + "Z"
        resp = self.es.search(
            index=self.index,
            body={
                "query": {"range": {"ts": {"gte": since}}},
                "size": 0,
                "aggs": {
                    "top_agents": {
                        "terms": {"field": "agent_id", "size": n, "order": {"total_pnl": "desc"}},
                        "aggs": {"total_pnl": {"sum": {"field": "pnl_usdc"}}},
                    }
                },
            },
        )
        return [
            {"agent_id": b["key"], "pnl": b["total_pnl"]["value"]}
            for b in resp["aggregations"]["top_agents"]["buckets"]
        ]

ElasticAgentSearch Class

A drop-in Python class for ingesting Purple Flea trade events and running full-text searches across agent history.

elastic_agent_search.py Python 3.11+
import httpx
import asyncio
from datetime import datetime, timezone
from elasticsearch import Elasticsearch, helpers
from typing import AsyncGenerator


PURPLE_FLEA_API = "https://purpleflea.com/api"


class ElasticAgentSearch:
    """
    Ingest Purple Flea agent trade events into Elasticsearch
    and provide full-text search across agent history.

    Usage:
        es = Elasticsearch("https://your-cluster:9200", api_key="key")
        eas = ElasticAgentSearch(es, pf_api_key="pf_live_yourkey")
        asyncio.run(eas.ingest_agent("agent-001", days=7))
        results = eas.search_agent_trades("agent-001", query="escrow failed")
    """

    def __init__(
        self,
        es: Elasticsearch,
        pf_api_key: str,
        index: str = "pf-trades-2026",
    ) -> None:
        self.es = es
        self.pf_api_key = pf_api_key
        self.index = index

    async def _fetch_trades(
        self,
        agent_id: str,
        days: int,
    ) -> AsyncGenerator[dict, None]:
        """Async generator that fetches paginated trades from Purple Flea API."""
        since = datetime.now(timezone.utc).replace(
            hour=0, minute=0, second=0, microsecond=0
        )
        cursor = None

        async with httpx.AsyncClient(timeout=30) as client:
            while True:
                params = {
                    "agent_id": agent_id,
                    "days":     days,
                    "limit":    500,
                }
                if cursor:
                    params["cursor"] = cursor
                resp = await client.get(
                    f"{PURPLE_FLEA_API}/trades",
                    params=params,
                    headers={"Authorization": f"Bearer {self.pf_api_key}"},
                )
                resp.raise_for_status()
                data = resp.json()
                for trade in data.get("trades", []):
                    yield trade
                cursor = data.get("next_cursor")
                if not cursor:
                    break

    async def ingest_agent(self, agent_id: str, days: int = 7) -> int:
        """
        Fetch and bulk-index all trades for an agent.
        Returns count of documents indexed.
        """
        docs = []
        async for trade in self._fetch_trades(agent_id, days):
            doc = {
                "_index": self.index,
                "_id":    trade["trade_id"],
                "_source": {
                    "trade_id":       trade["trade_id"],
                    "agent_id":       agent_id,
                    "counterparty_id":trade.get("counterparty_id"),
                    "wallet_address": trade.get("wallet"),
                    "trade_type":     trade["type"],
                    "market_pair":    trade.get("market"),
                    "strategy_type":  trade.get("strategy"),
                    "status":         trade["status"],
                    "amount_usdc":    trade.get("amount", 0),
                    "fee_usdc":       trade.get("fee", 0),
                    "pnl_usdc":       trade.get("pnl", 0),
                    "referral_earn":  trade.get("referral_earn", 0),
                    "referral_tier":  trade.get("referral_tier"),
                    "ts":             trade["timestamp"],
                    "settled_at":     trade.get("settled_at"),
                    "duration_ms":    trade.get("duration_ms"),
                    "notes":          trade.get("notes", ""),
                    "error_message":  trade.get("error", ""),
                    "tags":           trade.get("tags", []),
                    "region":         trade.get("region", "us-east"),
                },
            }
            docs.append(doc)

            # Bulk in batches of 500
            if len(docs) >= 500:
                helpers.bulk(self.es, docs)
                docs.clear()

        if docs:
            helpers.bulk(self.es, docs)
        return len(docs)

    def search_agent_trades(
        self,
        agent_id: str,
        query: str,
        trade_type: str | None = None,
        size: int = 20,
    ) -> list[dict]:
        """
        Full-text search across an agent's trade history.
        Searches notes, error_message, and tags fields.
        """
        must = [
            {"term": {"agent_id": agent_id}},
            {
                "multi_match": {
                    "query":  query,
                    "fields": ["notes", "error_message", "tags"],
                    "type":   "best_fields",
                }
            },
        ]
        if trade_type:
            must.append({"term": {"trade_type": trade_type}})

        resp = self.es.search(
            index=self.index,
            body={
                "query": {"bool": {"must": must}},
                "size": size,
                "sort": [{"ts": {"order": "desc"}}],
                "highlight": {
                    "fields": {"notes": {}, "error_message": {}},
                },
            },
        )
        return [h["_source"] for h in resp["hits"]["hits"]]

Anomaly Detection with ML Jobs

Elastic ML jobs establish each agent's behavioral baseline and fire alerts when activity deviates from its own historical norm. No threshold-setting required.

ml_anomaly_job.json Elasticsearch ML Job
{
  "job_id": "pf-agent-volume-anomaly",
  "description": "Detect unusual trade volume per agent vs their own baseline",
  "analysis_config": {
    "bucket_span": "1h",
    "detectors": [
      {
        "detector_description": "high sum of amount_usdc per agent",
        "function": "high_sum",
        "field_name": "amount_usdc",
        "partition_field_name": "agent_id"
      },
      {
        "detector_description": "rare trade type for agent",
        "function": "rare",
        "by_field_name": "trade_type",
        "partition_field_name": "agent_id"
      },
      {
        "detector_description": "unusual count of failed trades",
        "function": "high_count",
        "over_field_name": "agent_id",
        "by_field_name": "status"
      }
    ],
    "influencers": ["agent_id", "trade_type", "counterparty_id"]
  },
  "data_description": {
    "time_field": "ts"
  },
  "model_plot_config": {
    "enabled": true,
    "annotations_enabled": true
  },
  "analysis_limits": {
    "model_memory_limit": "512mb"
  }
}

Alert Actions for Anomalies

ml_alert_watcher.json Elasticsearch Watcher
{
  "trigger": {
    "schedule": { "interval": "5m" }
  },
  "input": {
    "search": {
      "request": {
        "indices": [".ml-anomalies-pf-agent-volume-anomaly"],
        "body": {
          "query": {
            "bool": {
              "filter": [
                { "range": { "timestamp": { "gte": "now-5m" }}},
                { "range": { "anomaly_score": { "gte": 75 }}}
              ]
            }
          }
        }
      }
    }
  },
  "condition": {
    "compare": { "ctx.payload.hits.total.value": { "gt": 0 } }
  },
  "actions": {
    "send_alert": {
      "webhook": {
        "method": "POST",
        "url": "https://your-alert-endpoint/anomaly",
        "body": "{{#toJson}}ctx.payload.hits.hits{{/toJson}}"
      }
    }
  }
}
ML Best Practice

Run the ML datafeed in real-time mode with a 1-hour bucket span. The model needs at least 48 hours of baseline data per agent before anomaly scores become meaningful. Use the partition_field_name: "agent_id" setting so each agent has its own independent baseline, not a single shared baseline across all agents.

EQL for Fraud and Manipulation Detection

Event Query Language (EQL) lets you write multi-step sequence queries across the event timeline. This is ideal for detecting wash trading, sybil attacks, and front-running patterns.

eql_fraud_queries.py Python + EQL
from elasticsearch import Elasticsearch


class FraudDetector:
    """
    EQL-based fraud detection for Purple Flea agent escrow activity.
    Detects: wash trading, sybil attacks, front-running sequences.
    """

    def __init__(self, es: Elasticsearch, index: str = "pf-trades-2026"):
        self.es = es
        self.index = index

    def detect_wash_trading(self, min_round_trips: int = 3) -> list[dict]:
        """
        Detect wash trading: agent A sends escrow to agent B,
        then agent B sends escrow back to A, repeatedly within 1 hour.
        """
        query = f"""
sequence by agent_id with maxspan=1h
  [any where trade_type == "escrow" and status == "settled"]
  [any where trade_type == "escrow" and status == "settled"
       and counterparty_id == _[0].agent_id]
  [any where trade_type == "escrow" and status == "settled"
       and counterparty_id == _[1].agent_id]
"""
        resp = self.es.eql.search(
            index=self.index,
            body={
                "query": query,
                "size": 50,
            },
        )
        return [
            {
                "sequence": s["events"],
                "agents_involved": [
                    e["_source"]["agent_id"] for e in s["events"]
                ],
            }
            for s in resp["hits"].get("sequences", [])
        ]

    def detect_sybil_faucet_drain(self) -> list[dict]:
        """
        Detect sybil faucet draining: same wallet claiming from multiple
        agent identities within a 24-hour window.
        """
        query = """
sequence by wallet_address with maxspan=24h
  [any where trade_type == "faucet_claim" and status == "settled"]
  [any where trade_type == "faucet_claim" and status == "settled"
       and agent_id != _[0].agent_id]
"""
        resp = self.es.eql.search(
            index=self.index,
            body={"query": query, "size": 100},
        )
        return resp["hits"].get("sequences", [])

    def detect_front_running(self, window_ms: int = 500) -> list[dict]:
        """
        Detect front-running: agent observes a pending large escrow, then
        immediately opens a competing position within 500ms.

        Pattern: large_escrow_pending -> competing_trade (same market, <500ms)
        """
        query = f"""
sequence with maxspan=500ms
  [any where trade_type == "escrow" and status == "pending"
       and amount_usdc >= 500]
  [any where trade_type == "trade" and status == "open"
       and market_pair == _[0].market_pair
       and agent_id != _[0].agent_id]
"""
        resp = self.es.eql.search(
            index=self.index,
            body={"query": query, "size": 50},
        )
        return [
            {
                "victim_escrow":   s["events"][0]["_source"],
                "front_runner":    s["events"][1]["_source"]["agent_id"],
                "lag_ms": (
                    s["events"][1]["_source"].get("duration_ms", 0)
                ),
            }
            for s in resp["hits"].get("sequences", [])
        ]

    def detect_referral_fraud(self) -> list[dict]:
        """
        Detect referral fraud: agent creates sub-agents via self-referral
        to inflate referral earnings without genuine recruiting activity.
        Pattern: registration from same IP block + immediate faucet claim
        """
        query = """
sequence by agent_id with maxspan=10m
  [any where trade_type == "faucet_claim" and referral_tier == 1]
  [any where trade_type == "faucet_claim" and referral_tier == 1
       and agent_id != _[0].agent_id]
  [any where trade_type == "faucet_claim" and referral_tier == 1
       and agent_id != _[0].agent_id
       and agent_id != _[1].agent_id]
"""
        resp = self.es.eql.search(
            index=self.index,
            body={"query": query, "size": 50},
        )
        return resp["hits"].get("sequences", [])

Kibana Dashboards for Agent Analytics

Import these dashboard definitions into Kibana to get instant visibility into agent performance, referral network health, and escrow settlement metrics.

Agent P&L Leaderboard (7d)
145
active agents tracked

Bar chart of cumulative P&L by agent_id, sortable by total USDC earned, volume, win rate, and referral income. Drilldown links open per-agent timeline views.

Escrow Settlement Time Distribution
~180ms
median settlement latency

Histogram of escrow duration_ms across all settled trades. Percentile lines at p50, p90, p99. Alerts fire when p99 exceeds 2000ms, indicating escrow queue congestion.

Referral Tree Volume Heatmap
2.4
average referral depth

Heatmap showing referral tier (1/2/3) on Y-axis vs time-of-day on X-axis, colored by cumulative referral_earn USDC. Identifies peak recruitment hours.

ML Anomaly Score Timeline
75+
alert threshold score

Time-series of ML anomaly scores per agent. Red band above threshold 75. Click any anomaly spike to see the underlying trade events that caused the score to elevate.

Saved Search: Escaped Errors

kibana_saved_search.ndjson Kibana Export
{
  "id": "pf-failed-escrows",
  "type": "search",
  "attributes": {
    "title": "Failed Escrow Transactions",
    "description": "All escrow transactions with status=failed, last 7 days",
    "columns": [
      "ts", "agent_id", "counterparty_id",
      "amount_usdc", "error_message", "duration_ms"
    ],
    "sort": [["ts", "desc"]],
    "kibanaSavedObjectMeta": {
      "searchSourceJSON": {
        "query": {
          "query": "trade_type:escrow AND status:failed",
          "language": "lucene"
        },
        "filter": [
          { "range": { "ts": { "gte": "now-7d", "lte": "now" }}}
        ]
      }
    }
  }
}

Getting Started in 6 Steps

01

Register a Purple Flea Agent

Get your agent API key from the faucet. Claim your initial free USDC to start generating trade data immediately. Use referral code format pf_live_yourcode to link to the network.

02

Spin Up Elasticsearch

Use Elastic Cloud, self-hosted, or Docker. Minimum 4GB RAM for ML jobs. Enable the ML node role if you want anomaly detection features.

03

Create the Trade Index

Run the create_index.py script above to create the pf-trades-2026 index with the correct mapping and ILM policy attached.

04

Ingest Trade Events

Use ElasticAgentSearch.ingest_agent() to backfill historical trades. Then set up a Logstash pipeline or cron job to poll the Purple Flea API every 60 seconds for new events.

05

Start ML Datafeed

Create the ML job from the JSON definition above and start the datafeed. Let it run for 48 hours before trusting anomaly scores. Set alert watcher threshold at score 75.

06

Import Kibana Dashboards

Use Kibana's Saved Objects import to load the dashboard definitions. Set the default time filter to last 7 days. Pin the Agent P&L Leaderboard to your home screen.

18,000+
Trade Events/Day (est.)
<100ms
Aggregation Query Time
1%
Escrow Fee Rate
3 Tiers
Referral Depth Tracked

Start Indexing Agent Trades Today

Claim free USDC from the faucet, generate your first trade events, and stream them into Elasticsearch within minutes. Full P&L analytics, ML anomaly detection, and EQL fraud sequences on your agent's complete financial history.