MongoDB Atlas + Purple Flea

MongoDB-Backed
Financial Agents

Use MongoDB's flexible document model to store agent trade histories with varying schemas, run aggregation pipelines for real-time P&L, watch wallet balance changes via change streams, and query time-series OHLCV data — all against Purple Flea's live financial API.

Register Your Agent View API Docs Get Free USDC
6 Purple Flea services
to store data from
$0 Entry cost via
faucet.purpleflea.com
137+ Live agents already
on the platform
1% Escrow fee
15% referral

Trade History as Flexible Documents

Different trading strategies generate radically different data shapes. A casino bet has a game_type and multiplier; a futures position has a funding_rate and liquidation_price; a domain acquisition has an expiry_date and estimated_traffic. MongoDB's schema-flexible document model handles all of these without forcing them into a rigid relational table.

Casino

Casino Round Document

{
  "_id": ObjectId("..."),
  "agent_id": "agent_7f9a3c",
  "game_type": "crash",
  "bet_usdc": 10.00,
  "cashout_multiplier": 2.41,
  "pnl_usdc": 14.10,
  "crash_point": 3.05,
  "session_id": "sess_abc123",
  "timestamp": ISODate("2026-03-07T...")
}
Futures

Futures Trade Document

{
  "_id": ObjectId("..."),
  "agent_id": "agent_7f9a3c",
  "symbol": "BTC-PERP",
  "side": "long",
  "size_usd": 5000.00,
  "entry_price": 68200.00,
  "exit_price": 70450.00,
  "funding_paid": -1.23,
  "pnl_usdc": 163.85,
  "strategy": "momentum",
  "timestamp": ISODate("2026-03-07T...")
}
Escrow

Escrow Payment Document

{
  "_id": ObjectId("..."),
  "agent_id": "agent_7f9a3c",
  "counterparty": "agent_b9c2f1",
  "escrow_id": "esc_20260307",
  "amount_usdc": 250.00,
  "service": "data_feed",
  "fee_usdc": 2.50,
  "status": "released",
  "referral_earned": 0.38,
  "timestamp": ISODate("2026-03-07T...")
}
Wallet

Wallet Transfer Document

{
  "_id": ObjectId("..."),
  "agent_id": "agent_7f9a3c",
  "direction": "receive",
  "amount_usdc": 50.00,
  "from_address": "0xFauc...",
  "to_address": "0xAgen...",
  "source": "faucet_claim",
  "chain": "polygon",
  "tx_hash": "0xabc123...",
  "timestamp": ISODate("2026-03-07T...")
}

Why Not a Relational Database?

Relational databases require you to define a schema upfront and to maintain migrations as strategies evolve. When your agent adds a new field — say, volatility_regime to its futures trades after you upgrade the strategy — a relational database requires an ALTER TABLE, a migration, and potential downtime. MongoDB simply stores the new field alongside old documents that don't have it. Your analytics pipeline can handle both shapes with a single aggregation stage using $ifNull.

For AI agent systems where the strategy and the data model are co-evolving at inference speed, this flexibility is not a nice-to-have — it is architecturally essential.

📄

Schema Polymorphism

Store casino bets, futures positions, and escrow payments in the same collection with different fields per document — no JOINs, no NULLs for missing strategy-specific data.

Horizontal Scalability

Shard your trades collection by agent_id as your fleet grows from 1 to 10,000 agents. Each shard holds a subset of agents with no cross-shard queries for per-agent analytics.

🔎

Rich Indexing

Compound indexes on {"agent_id": 1, "timestamp": -1} make per-agent time-sorted queries O(log n). Sparse indexes on optional strategy fields avoid bloating the index for documents that don't have them.

📊

Native Time-Series

MongoDB's time-series collections provide automatic time-bucketing, efficient compression, and granular TTL policies — perfect for OHLCV market data and wallet balance snapshots.

Aggregation Pipelines for Agent P&L Reporting

MongoDB's aggregation framework is a declarative data processing pipeline that can compute P&L breakdowns, win rates, Sharpe ratios, and strategy comparisons entirely within the database — without pulling raw data into application memory.

Daily P&L by Strategy Type

The following pipeline aggregates all trades for a given agent in the past 30 days, groups by strategy, and computes key metrics:

# aggregations.py — Purple Flea agent P&L analytics with MongoDB
from pymongo import MongoClient
from datetime import datetime, timezone, timedelta
from bson import Decimal128

client = MongoClient("mongodb+srv://your-cluster.mongodb.net/",
                     username="agent_user",
                     password="your_password")
db = client["purple_flea_agents"]
trades = db["trades"]


def get_pnl_by_strategy(agent_id: str, days: int = 30) -> list[dict]:
    """
    Aggregate agent P&L grouped by strategy over the past N days.

    Returns a list of dicts, one per strategy:
    {
      "strategy": "momentum",
      "trade_count": 142,
      "gross_pnl": 1823.45,
      "total_fees": 18.23,
      "net_pnl": 1805.22,
      "win_rate": 0.634,
      "avg_pnl_per_trade": 12.71,
      "best_trade": 280.50,
      "worst_trade": -95.30
    }
    """
    since = datetime.now(timezone.utc) - timedelta(days=days)

    pipeline = [
        # Stage 1: Filter to this agent's recent trades
        {"$match": {
            "agent_id": agent_id,
            "timestamp": {"$gte": since}
        }},

        # Stage 2: Group by strategy with computed metrics
        {"$group": {
            "_id": {"$ifNull": ["$strategy", "unclassified"]},
            "trade_count":        {"$sum": 1},
            "gross_pnl":          {"$sum": "$pnl_usdc"},
            "total_fees":         {"$sum": {"$ifNull": ["$fee_usdc", 0]}},
            "winning_trades":     {"$sum": {"$cond": [{"$gt": ["$pnl_usdc", 0]}, 1, 0]}},
            "best_trade":         {"$max": "$pnl_usdc"},
            "worst_trade":        {"$min": "$pnl_usdc"},
            "pnl_variance_sum":   {"$sum": {"$pow": ["$pnl_usdc", 2]}}
        }},

        # Stage 3: Compute derived fields
        {"$project": {
            "strategy":           "$_id",
            "trade_count":        1,
            "gross_pnl":          {"$round": ["$gross_pnl", 2]},
            "total_fees":         {"$round": ["$total_fees", 2]},
            "net_pnl":            {"$round": [{"$subtract": ["$gross_pnl", "$total_fees"]}, 2]},
            "win_rate":           {"$round": [{"$divide": ["$winning_trades", "$trade_count"]}, 4]},
            "avg_pnl_per_trade":  {"$round": [{"$divide": ["$gross_pnl", "$trade_count"]}, 2]},
            "best_trade":         {"$round": ["$best_trade", 2]},
            "worst_trade":        {"$round": ["$worst_trade", 2]},
            "_id": 0
        }},

        # Stage 4: Sort by net P&L descending
        {"$sort": {"net_pnl": -1}}
    ]

    return list(trades.aggregate(pipeline, allowDiskUse=True))


def get_daily_equity_curve(agent_id: str, days: int = 90) -> list[dict]:
    """
    Compute a daily equity curve (cumulative P&L) for an agent.
    Returns [{date: "2026-03-07", daily_pnl: 45.23, cumulative_pnl: 823.10}, ...]
    """
    since = datetime.now(timezone.utc) - timedelta(days=days)

    pipeline = [
        {"$match": {"agent_id": agent_id, "timestamp": {"$gte": since}}},

        # Truncate timestamp to day
        {"$group": {
            "_id": {
                "year":  {"$year": "$timestamp"},
                "month": {"$month": "$timestamp"},
                "day":   {"$dayOfMonth": "$timestamp"}
            },
            "daily_pnl": {"$sum": "$pnl_usdc"}
        }},

        {"$sort": {"_id.year": 1, "_id.month": 1, "_id.day": 1}},

        # Running cumulative sum using $setWindowFields (MongoDB 5.0+)
        {"$setWindowFields": {
            "sortBy": {"_id": 1},
            "output": {
                "cumulative_pnl": {
                    "$sum": "$daily_pnl",
                    "window": {"documents": ["unbounded", "current"]}
                }
            }
        }},

        {"$project": {
            "_id": 0,
            "date": {
                "$dateToString": {
                    "format": "%Y-%m-%d",
                    "date": {
                        "$dateFromParts": {
                            "year": "$_id.year",
                            "month": "$_id.month",
                            "day": "$_id.day"
                        }
                    }
                }
            },
            "daily_pnl":      {"$round": ["$daily_pnl", 2]},
            "cumulative_pnl": {"$round": ["$cumulative_pnl", 2]}
        }}
    ]

    return list(trades.aggregate(pipeline))


def get_sharpe_ratio(agent_id: str, days: int = 30, risk_free_rate: float = 0.05) -> float:
    """
    Compute approximate annualised Sharpe ratio from daily P&L data.
    Uses daily returns relative to starting equity.
    """
    equity_curve = get_daily_equity_curve(agent_id, days)
    if len(equity_curve) < 10:
        return float("nan")

    import statistics
    daily_returns = [day["daily_pnl"] for day in equity_curve]
    mean_return   = statistics.mean(daily_returns)
    std_return    = statistics.stdev(daily_returns)
    daily_rfr     = risk_free_rate / 252

    if std_return == 0:
        return float("inf")

    sharpe = (mean_return - daily_rfr) / std_return
    return round(sharpe * (252 ** 0.5), 4)  # annualise

Casino Win Rate by Multiplier Tier

For agents playing Purple Flea's casino, this pipeline buckets crash game outcomes by the cashout multiplier tier and computes EV per tier:

def get_casino_stats_by_multiplier_tier(agent_id: str) -> list[dict]:
    """
    Group casino trades into multiplier tiers:
    [1x-2x), [2x-5x), [5x-10x), [10x+)
    Report count, total bet, total payout, and ROI per tier.
    """
    pipeline = [
        {"$match": {
            "agent_id": agent_id,
            "game_type": {"$in": ["crash", "coin_flip"]}
        }},
        {"$addFields": {
            "tier": {
                "$switch": {
                    "branches": [
                        {"case": {"$lt": ["$cashout_multiplier", 2]},  "then": "1x-2x"},
                        {"case": {"$lt": ["$cashout_multiplier", 5]},  "then": "2x-5x"},
                        {"case": {"$lt": ["$cashout_multiplier", 10]}, "then": "5x-10x"},
                    ],
                    "default": "10x+"
                }
            }
        }},
        {"$group": {
            "_id": "$tier",
            "count":       {"$sum": 1},
            "total_bet":   {"$sum": "$bet_usdc"},
            "total_payout":{"$sum": {"$add": ["$bet_usdc", "$pnl_usdc"]}},
            "total_pnl":   {"$sum": "$pnl_usdc"}
        }},
        {"$project": {
            "tier":       "$_id",
            "count":      1,
            "total_bet":  {"$round": ["$total_bet", 2]},
            "total_pnl":  {"$round": ["$total_pnl", 2]},
            "roi_pct":    {"$round": [
                {"$multiply": [{"$divide": ["$total_pnl", "$total_bet"]}, 100]}, 2
            ]},
            "_id": 0
        }},
        {"$sort": {"total_bet": -1}}
    ]
    return list(db["trades"].aggregate(pipeline))

Change Streams to React to Wallet Balance Updates

MongoDB Change Streams give you a real-time event feed of database mutations — insertions, updates, and deletions — without polling. For AI agents, this means you can react to wallet balance changes, escrow state transitions, and incoming payments the moment they are recorded.

Watching for Incoming USDC Payments

# change_streams.py — Real-time agent wallet reactivity via MongoDB
import asyncio
from pymongo import MongoClient
from pymongo.errors import PyMongoError

client = MongoClient("mongodb+srv://your-cluster.mongodb.net/")
db = client["purple_flea_agents"]


async def watch_balance(
    agent_id: str,
    on_credit: callable,
    on_debit:  callable,
    min_amount_usdc: float = 1.0
):
    """
    Watch the wallet_events collection for balance changes on a specific
    agent. Fires on_credit() for incoming transfers and on_debit() for
    outgoing transfers above min_amount_usdc.

    Purple Flea writes a wallet_event document whenever:
      - The agent claims from the faucet
      - An escrow payment is released to the agent
      - The agent initiates a transfer
      - The agent's casino balance changes

    Args:
        agent_id: The agent to watch
        on_credit: async callable(event) for incoming funds
        on_debit:  async callable(event) for outgoing funds
        min_amount_usdc: Ignore events below this threshold
    """
    pipeline = [
        {"$match": {
            "operationType": {"$in": ["insert", "update"]},
            "fullDocument.agent_id": agent_id,
            "fullDocument.amount_usdc": {"$gte": min_amount_usdc}
        }}
    ]

    # Resume token allows the watcher to resume after a disconnect
    resume_token = None

    print(f"Watching wallet events for {agent_id}...")

    while True:
        try:
            watch_kwargs = {
                "pipeline": pipeline,
                "full_document": "updateLookup",
            }
            if resume_token:
                watch_kwargs["resume_after"] = resume_token

            with db["wallet_events"].watch(**watch_kwargs) as stream:
                for change in stream:
                    resume_token = change["_id"]
                    doc = change.get("fullDocument", {})
                    direction = doc.get("direction", "")

                    if direction == "receive":
                        await on_credit(doc)
                    elif direction == "send":
                        await on_debit(doc)

        except PyMongoError as e:
            print(f"Change stream error: {e}. Reconnecting in 5s...")
            await asyncio.sleep(5)


# Example callbacks:

async def rebalance_on_credit(event: dict):
    """
    When a credit arrives, check if we have enough to open a new position.
    Called in real-time — no polling delay.
    """
    amount = event["amount_usdc"]
    agent_id = event["agent_id"]
    print(f"[{agent_id}] Credit: +${amount:.2f} USDC via {event.get('source')}")

    # Fetch current strategy state and decide whether to deploy the funds
    # e.g., if balance > min_position_size, open a new trade
    # This reaction happens within milliseconds of the DB write

async def log_on_debit(event: dict):
    print(f"[{event['agent_id']}] Debit: -${event['amount_usdc']:.2f} USDC "
          f"to {event.get('to_address', 'unknown')}")


# Example: Watch for escrow releases
async def watch_escrow_releases(agent_id: str, on_release: callable):
    """
    Watch the escrow_events collection for payments released to this agent.
    Fires on_release() when an escrow is settled in the agent's favour.
    """
    pipeline = [
        {"$match": {
            "operationType": "update",
            "updateDescription.updatedFields.status": "released",
            "fullDocument.beneficiary_agent": agent_id
        }}
    ]

    with db["escrow_events"].watch(pipeline=pipeline,
                                   full_document="updateLookup") as stream:
        for change in stream:
            doc = change["fullDocument"]
            print(f"Escrow {doc['escrow_id']} released: "
                  f"${doc['amount_usdc']:.2f} USDC")
            await on_release(doc)


# Run both watchers concurrently
async def main():
    await asyncio.gather(
        watch_balance(
            agent_id="agent_7f9a3c",
            on_credit=rebalance_on_credit,
            on_debit=log_on_debit
        ),
        watch_escrow_releases(
            agent_id="agent_7f9a3c",
            on_release=lambda e: print(f"Got paid: {e}")
        )
    )

if __name__ == "__main__":
    asyncio.run(main())
Change Streams Require a Replica Set

Change Streams are only available on MongoDB replica sets or sharded clusters — not standalone instances. MongoDB Atlas provides replica sets by default on all tiers, including the free M0 tier. For production agents, use at least an M10 cluster to guarantee the oplog retention needed for stream resumption after disconnects.

AgentTradeRepository Class

The following repository class wraps the most common MongoDB operations for agent trade storage, P&L analysis, and balance watching into a clean, reusable interface that maps directly to Purple Flea's data model.

# agent_trade_repository.py
# MongoDB persistence layer for Purple Flea agent trade data.
# Requires: pip install pymongo motor

from motor.motor_asyncio import AsyncIOMotorClient  # async MongoDB driver
from datetime import datetime, timezone, timedelta
from dataclasses import dataclass, asdict, field
from typing import AsyncIterator, Optional
import asyncio


@dataclass
class Trade:
    agent_id:    str
    game_type:   str      # "crash" | "coin_flip" | "futures" | "spot" | "escrow"
    pnl_usdc:    float
    timestamp:   datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    strategy:    Optional[str] = None
    bet_usdc:    Optional[float] = None
    fee_usdc:    Optional[float] = None
    metadata:    Optional[dict] = None   # strategy-specific fields


class AgentTradeRepository:
    """
    MongoDB-backed repository for Purple Flea agent trade records.

    Provides:
      - save_trade(): persist a trade with automatic indexing
      - get_pnl_by_strategy(): aggregated P&L breakdown
      - watch_balance(): async generator of real-time wallet events
      - get_recent_trades(): paginated trade history
      - export_for_reporting(): 7-year retention compliant export
    """

    def __init__(self, mongo_uri: str, db_name: str = "purple_flea"):
        self._client   = AsyncIOMotorClient(mongo_uri)
        self._db       = self._client[db_name]
        self._trades   = self._db["trades"]
        self._wallets  = self._db["wallet_events"]
        self._ready    = False

    async def init(self):
        """Create indexes. Call once at startup."""
        # Primary lookup pattern: agent + time-sorted
        await self._trades.create_index(
            [("agent_id", 1), ("timestamp", -1)],
            name="agent_time"
        )
        # Strategy analytics
        await self._trades.create_index(
            [("agent_id", 1), ("strategy", 1), ("timestamp", -1)],
            name="agent_strategy_time",
            sparse=True
        )
        # Time-range queries across all agents (for fleet analytics)
        await self._trades.create_index("timestamp", name="time_global")
        self._ready = True
        print("AgentTradeRepository indexes created.")

    async def save_trade(self, trade: Trade) -> str:
        """
        Persist a trade document. Returns the inserted document ID.

        Documents with different strategies can have different fields
        in the metadata dict — MongoDB stores them all without a schema change.
        """
        doc = asdict(trade)
        # Remove None values for clean documents (MongoDB doesn't need NULLs)
        doc = {k: v for k, v in doc.items() if v is not None}

        result = await self._trades.insert_one(doc)
        return str(result.inserted_id)

    async def get_pnl_by_strategy(
        self,
        agent_id:   str,
        days:       int = 30,
    ) -> list[dict]:
        """
        Aggregated P&L grouped by strategy for the past N days.
        """
        since = datetime.now(timezone.utc) - timedelta(days=days)

        pipeline = [
            {"$match": {
                "agent_id": agent_id,
                "timestamp": {"$gte": since}
            }},
            {"$group": {
                "_id":        {"$ifNull": ["$strategy", "unclassified"]},
                "trade_count":{"$sum": 1},
                "gross_pnl":  {"$sum": "$pnl_usdc"},
                "total_fees": {"$sum": {"$ifNull": ["$fee_usdc", 0]}},
                "wins":       {"$sum": {"$cond": [{"$gt": ["$pnl_usdc", 0]}, 1, 0]}}
            }},
            {"$project": {
                "strategy":     "$_id",
                "trade_count":  1,
                "net_pnl":      {"$round": [
                    {"$subtract": ["$gross_pnl", "$total_fees"]}, 2
                ]},
                "win_rate":     {"$round": [
                    {"$divide": ["$wins", "$trade_count"]}, 4
                ]},
                "avg_trade":    {"$round": [
                    {"$divide": ["$gross_pnl", "$trade_count"]}, 2
                ]},
                "_id": 0
            }},
            {"$sort": {"net_pnl": -1}}
        ]

        return await self._trades.aggregate(pipeline).to_list(length=None)

    async def watch_balance(
        self,
        agent_id: str,
        min_usdc: float = 0.01
    ) -> AsyncIterator[dict]:
        """
        Async generator yielding wallet event documents as they arrive.
        Uses MongoDB Change Streams for zero-latency reactivity.

        Usage:
            async for event in repo.watch_balance("agent_7f9a3c"):
                print(f"Balance event: {event['direction']} ${event['amount_usdc']}")
        """
        pipeline = [
            {"$match": {
                "operationType": {"$in": ["insert", "update"]},
                "fullDocument.agent_id": agent_id,
                "fullDocument.amount_usdc": {"$gte": min_usdc}
            }}
        ]

        async with self._wallets.watch(
            pipeline, full_document="updateLookup"
        ) as stream:
            async for change in stream:
                yield change.get("fullDocument", {})

    async def get_recent_trades(
        self,
        agent_id: str,
        limit:    int = 50,
        skip:     int = 0,
        strategy: Optional[str] = None
    ) -> list[dict]:
        """Paginated trade history, most recent first."""
        query: dict = {"agent_id": agent_id}
        if strategy:
            query["strategy"] = strategy

        cursor = (
            self._trades.find(query, {"_id": 0})
            .sort("timestamp", -1)
            .skip(skip)
            .limit(limit)
        )
        return await cursor.to_list(length=limit)

    async def export_for_reporting(
        self,
        agent_id:     str,
        start:        datetime,
        end:          datetime,
        output_path:  str
    ) -> int:
        """
        Export all trades in a date range to a JSON-Lines file for
        regulatory record-keeping (7-year retention).
        Returns number of records exported.
        """
        import json
        from pathlib import Path

        cursor = self._trades.find(
            {"agent_id": agent_id, "timestamp": {"$gte": start, "$lt": end}},
            {"_id": 0}
        ).sort("timestamp", 1)

        count = 0
        Path(output_path).parent.mkdir(parents=True, exist_ok=True)

        with open(output_path, "w") as f:
            async for doc in cursor:
                # Convert datetime to ISO string for JSON serialisation
                doc["timestamp"] = doc["timestamp"].isoformat()
                f.write(json.dumps(doc) + "\n")
                count += 1

        print(f"Exported {count} trades to {output_path}")
        return count

    async def close(self):
        self._client.close()

Time-Series Collections for OHLCV Market Data

MongoDB's native time-series collections provide automatic time-bucketing, lossless compression (up to 10x smaller than standard collections for sequential data), and granular TTL policies — ideal for storing the OHLCV candlestick data that agents use for technical analysis.

Creating a Time-Series Collection

# timeseries_setup.py — One-time setup for OHLCV time-series collections

from pymongo import MongoClient

client = MongoClient("mongodb+srv://your-cluster.mongodb.net/")
db = client["purple_flea_market_data"]

# Create a time-series collection with 90-day TTL
# MongoDB will automatically bucket and compress sequential documents
db.create_collection(
    "ohlcv",
    timeseries={
        "timeField":       "timestamp",   # must be a BSON Date
        "metaField":       "symbol",      # field used for partitioning
        "granularity":     "minutes"      # "seconds" | "minutes" | "hours"
    },
    # Auto-delete candles older than 90 days (retain for backtesting window)
    expireAfterSeconds=90 * 24 * 3600
)

# Create secondary index for fast per-symbol range queries
db["ohlcv"].create_index([("symbol", 1), ("timestamp", -1)])

print("Time-series OHLCV collection created with 90-day TTL.")

Writing and Querying OHLCV Data

# ohlcv_store.py — Ingest and query market data for AI agent strategies

from motor.motor_asyncio import AsyncIOMotorClient
from datetime import datetime, timezone, timedelta
import httpx, asyncio


class OHLCVStore:
    """
    Time-series store for Purple Flea agent market data consumption.
    Ingests candles from external price feeds and serves them to agents
    for technical analysis and strategy signals.
    """

    def __init__(self, mongo_uri: str):
        client = AsyncIOMotorClient(mongo_uri)
        self._col = client["purple_flea_market_data"]["ohlcv"]

    async def insert_candle(
        self,
        symbol:    str,
        timestamp: datetime,
        open_:     float,
        high:      float,
        low:       float,
        close:     float,
        volume:    float
    ):
        """Insert a single OHLCV candle. Timestamp must be UTC."""
        await self._col.insert_one({
            "timestamp": timestamp,   # timeField
            "symbol":    symbol,      # metaField — used for partitioning
            "o": open_,
            "h": high,
            "l": low,
            "c": close,
            "v": volume
        })

    async def insert_many_candles(self, candles: list[dict]):
        """Bulk insert — up to 10,000 candles at a time for efficiency."""
        if candles:
            await self._col.insert_many(candles)

    async def get_candles(
        self,
        symbol: str,
        since:  datetime,
        limit:  int = 500
    ) -> list[dict]:
        """
        Retrieve recent OHLCV candles, newest first.
        Returns a list of {timestamp, o, h, l, c, v} dicts.
        """
        cursor = (
            self._col.find(
                {"symbol": symbol, "timestamp": {"$gte": since}},
                {"_id": 0, "symbol": 0}
            )
            .sort("timestamp", 1)
            .limit(limit)
        )
        return await cursor.to_list(length=limit)

    async def get_vwap(
        self,
        symbol: str,
        hours:  int = 24
    ) -> float:
        """
        Compute Volume-Weighted Average Price over the last N hours
        using an aggregation pipeline on the time-series collection.
        """
        since = datetime.now(timezone.utc) - timedelta(hours=hours)
        pipeline = [
            {"$match": {
                "symbol": symbol,
                "timestamp": {"$gte": since}
            }},
            {"$group": {
                "_id": None,
                "sum_pv": {"$sum": {"$multiply": ["$c", "$v"]}},
                "sum_v":  {"$sum": "$v"}
            }},
            {"$project": {
                "_id": 0,
                "vwap": {"$cond": {
                    "if": {"$gt": ["$sum_v", 0]},
                    "then": {"$round": [{"$divide": ["$sum_pv", "$sum_v"]}, 4]},
                    "else": 0
                }}
            }}
        ]
        result = await self._col.aggregate(pipeline).to_list(1)
        return result[0]["vwap"] if result else 0.0

    async def get_bollinger_bands(
        self,
        symbol:    str,
        period:    int = 20,
        std_devs:  float = 2.0
    ) -> dict:
        """
        Compute Bollinger Bands over the last `period` candles.
        Returns {"upper": ..., "middle": ..., "lower": ...}
        """
        since = datetime.now(timezone.utc) - timedelta(minutes=period + 5)
        candles = await self.get_candles(symbol, since, limit=period)
        closes = [c["c"] for c in candles[-period:]]

        if len(closes) < period:
            return {}

        mean = sum(closes) / len(closes)
        variance = sum((c - mean) ** 2 for c in closes) / len(closes)
        std = variance ** 0.5

        return {
            "upper":  round(mean + std_devs * std, 4),
            "middle": round(mean, 4),
            "lower":  round(mean - std_devs * std, 4)
        }

Multi-Agent Analytics Across a Fleet of 100+ Agents

When you run a fleet of 100 or more trading agents, the interesting questions shift from per-agent P&L to fleet-wide patterns: which strategies are working in the current regime? Which agents are correlated? Which ones are silently bleeding capital?

Fleet-Wide Strategy Performance

# fleet_analytics.py — Cross-agent analytics for Purple Flea fleets

from motor.motor_asyncio import AsyncIOMotorClient
from datetime import datetime, timezone, timedelta

client = AsyncIOMotorClient("mongodb+srv://your-cluster.mongodb.net/")
db = client["purple_flea_agents"]
trades = db["trades"]


async def get_top_performing_agents(
    strategy: str = None,
    days:     int = 7,
    top_n:    int = 10
) -> list[dict]:
    """
    Rank all agents by net P&L over the past N days.
    Optionally filter to a specific strategy.
    Returns the top N agents with their key metrics.
    """
    since = datetime.now(timezone.utc) - timedelta(days=days)
    match_stage: dict = {"timestamp": {"$gte": since}}
    if strategy:
        match_stage["strategy"] = strategy

    pipeline = [
        {"$match": match_stage},
        {"$group": {
            "_id":         "$agent_id",
            "net_pnl":     {"$sum": "$pnl_usdc"},
            "trade_count": {"$sum": 1},
            "wins":        {"$sum": {"$cond": [{"$gt": ["$pnl_usdc", 0]}, 1, 0]}}
        }},
        {"$project": {
            "agent_id":    "$_id",
            "net_pnl":     {"$round": ["$net_pnl", 2]},
            "trade_count": 1,
            "win_rate":    {"$round": [
                {"$divide": ["$wins", "$trade_count"]}, 4
            ]},
            "_id": 0
        }},
        {"$sort":  {"net_pnl": -1}},
        {"$limit": top_n}
    ]

    return await trades.aggregate(pipeline).to_list(length=top_n)


async def get_fleet_strategy_heatmap(days: int = 30) -> list[dict]:
    """
    Build a strategy x metric heatmap across the entire fleet.
    Shows which strategies are working at the population level.

    Returns:
    [
      {
        "strategy": "momentum",
        "agent_count": 38,
        "total_pnl": 4823.10,
        "median_pnl_per_agent": 126.97,
        "fleet_win_rate": 0.589
      }, ...
    ]
    """
    since = datetime.now(timezone.utc) - timedelta(days=days)

    pipeline = [
        {"$match": {"timestamp": {"$gte": since}}},

        # Per-agent, per-strategy totals first
        {"$group": {
            "_id": {
                "agent_id": "$agent_id",
                "strategy": {"$ifNull": ["$strategy", "unclassified"]}
            },
            "agent_pnl": {"$sum": "$pnl_usdc"},
            "trades":    {"$sum": 1},
            "wins":      {"$sum": {"$cond": [{"$gt": ["$pnl_usdc", 0]}, 1, 0]}}
        }},

        # Then aggregate across agents per strategy
        {"$group": {
            "_id":               "$_id.strategy",
            "agent_count":       {"$sum": 1},
            "total_pnl":         {"$sum": "$agent_pnl"},
            "agent_pnls":        {"$push": "$agent_pnl"},
            "total_trades":      {"$sum": "$trades"},
            "total_wins":        {"$sum": "$wins"}
        }},

        {"$project": {
            "strategy":      "$_id",
            "agent_count":   1,
            "total_pnl":     {"$round": ["$total_pnl", 2]},
            "fleet_win_rate":{"$round": [
                {"$divide": ["$total_wins", "$total_trades"]}, 4
            ]},
            # Approximate median using sorted array (works for smaller fleets)
            "median_pnl_per_agent": {
                "$round": [
                    {"$arrayElemAt": [
                        {"$sortArray": {"input": "$agent_pnls", "sortBy": 1}},
                        {"$floor": {"$divide": [{"$size": "$agent_pnls"}, 2]}}
                    ]}, 2
                ]
            },
            "_id": 0
        }},
        {"$sort": {"total_pnl": -1}}
    ]

    return await trades.aggregate(pipeline, allowDiskUse=True).to_list(length=None)


async def detect_correlated_agents(
    agent_ids: list[str],
    days: int = 14
) -> dict:
    """
    Compute pairwise P&L correlation between a list of agents
    using daily P&L series. Helps identify over-correlated strategies
    that provide no diversification benefit.

    Returns: {"agent_a|agent_b": pearson_r, ...}
    """
    import statistics, itertools

    since = datetime.now(timezone.utc) - timedelta(days=days)

    # Fetch daily P&L for each agent
    agent_daily_pnl: dict[str, dict] = {}

    for agent_id in agent_ids:
        pipeline = [
            {"$match": {"agent_id": agent_id, "timestamp": {"$gte": since}}},
            {"$group": {
                "_id": {
                    "y": {"$year": "$timestamp"},
                    "m": {"$month": "$timestamp"},
                    "d": {"$dayOfMonth": "$timestamp"}
                },
                "daily_pnl": {"$sum": "$pnl_usdc"}
            }},
            {"$sort": {"_id.y": 1, "_id.m": 1, "_id.d": 1}}
        ]
        days_data = await trades.aggregate(pipeline).to_list(length=None)
        agent_daily_pnl[agent_id] = {
            f"{r['_id']['y']}-{r['_id']['m']:02}-{r['_id']['d']:02}": r["daily_pnl"]
            for r in days_data
        }

    # Compute Pearson r for each pair
    correlations: dict[str, float] = {}
    for a, b in itertools.combinations(agent_ids, 2):
        dates = set(agent_daily_pnl[a]) & set(agent_daily_pnl[b])
        if len(dates) < 5:
            continue
        xs = [agent_daily_pnl[a][d] for d in sorted(dates)]
        ys = [agent_daily_pnl[b][d] for d in sorted(dates)]
        try:
            r = statistics.correlation(xs, ys)
            correlations[f"{a}|{b}"] = round(r, 4)
        except statistics.StatisticsError:
            pass

    return correlations

Deploy Your First MongoDB-Backed Agent

Purple Flea provides the financial API — casino, trading, wallet, escrow, faucet, and domains. MongoDB stores and analyzes every interaction. Together, they give AI agents a complete financial operating system with institutional-grade observability.