Trade History as Flexible Documents
Different trading strategies generate radically different data shapes. A casino bet has a game_type and multiplier; a futures position has a funding_rate and liquidation_price; a domain acquisition has an expiry_date and estimated_traffic. MongoDB's schema-flexible document model handles all of these without forcing them into a rigid relational table.
Casino Round Document
{
"_id": ObjectId("..."),
"agent_id": "agent_7f9a3c",
"game_type": "crash",
"bet_usdc": 10.00,
"cashout_multiplier": 2.41,
"pnl_usdc": 14.10,
"crash_point": 3.05,
"session_id": "sess_abc123",
"timestamp": ISODate("2026-03-07T...")
}
Futures Trade Document
{
"_id": ObjectId("..."),
"agent_id": "agent_7f9a3c",
"symbol": "BTC-PERP",
"side": "long",
"size_usd": 5000.00,
"entry_price": 68200.00,
"exit_price": 70450.00,
"funding_paid": -1.23,
"pnl_usdc": 163.85,
"strategy": "momentum",
"timestamp": ISODate("2026-03-07T...")
}
Escrow Payment Document
{
"_id": ObjectId("..."),
"agent_id": "agent_7f9a3c",
"counterparty": "agent_b9c2f1",
"escrow_id": "esc_20260307",
"amount_usdc": 250.00,
"service": "data_feed",
"fee_usdc": 2.50,
"status": "released",
"referral_earned": 0.38,
"timestamp": ISODate("2026-03-07T...")
}
Wallet Transfer Document
{
"_id": ObjectId("..."),
"agent_id": "agent_7f9a3c",
"direction": "receive",
"amount_usdc": 50.00,
"from_address": "0xFauc...",
"to_address": "0xAgen...",
"source": "faucet_claim",
"chain": "polygon",
"tx_hash": "0xabc123...",
"timestamp": ISODate("2026-03-07T...")
}
Why Not a Relational Database?
Relational databases require you to define a schema upfront and to maintain migrations as strategies evolve. When your agent adds a new field — say, volatility_regime to its futures trades after you upgrade the strategy — a relational database requires an ALTER TABLE, a migration, and potential downtime. MongoDB simply stores the new field alongside old documents that don't have it. Your analytics pipeline can handle both shapes with a single aggregation stage using $ifNull.
For AI agent systems where the strategy and the data model are co-evolving at inference speed, this flexibility is not a nice-to-have — it is architecturally essential.
Schema Polymorphism
Store casino bets, futures positions, and escrow payments in the same collection with different fields per document — no JOINs, no NULLs for missing strategy-specific data.
Horizontal Scalability
Shard your trades collection by agent_id as your fleet grows from 1 to 10,000 agents. Each shard holds a subset of agents with no cross-shard queries for per-agent analytics.
Rich Indexing
Compound indexes on {"agent_id": 1, "timestamp": -1} make per-agent time-sorted queries O(log n). Sparse indexes on optional strategy fields avoid bloating the index for documents that don't have them.
Native Time-Series
MongoDB's time-series collections provide automatic time-bucketing, efficient compression, and granular TTL policies — perfect for OHLCV market data and wallet balance snapshots.
Aggregation Pipelines for Agent P&L Reporting
MongoDB's aggregation framework is a declarative data processing pipeline that can compute P&L breakdowns, win rates, Sharpe ratios, and strategy comparisons entirely within the database — without pulling raw data into application memory.
Daily P&L by Strategy Type
The following pipeline aggregates all trades for a given agent in the past 30 days, groups by strategy, and computes key metrics:
# aggregations.py — Purple Flea agent P&L analytics with MongoDB
from pymongo import MongoClient
from datetime import datetime, timezone, timedelta
from bson import Decimal128
client = MongoClient("mongodb+srv://your-cluster.mongodb.net/",
username="agent_user",
password="your_password")
db = client["purple_flea_agents"]
trades = db["trades"]
def get_pnl_by_strategy(agent_id: str, days: int = 30) -> list[dict]:
"""
Aggregate agent P&L grouped by strategy over the past N days.
Returns a list of dicts, one per strategy:
{
"strategy": "momentum",
"trade_count": 142,
"gross_pnl": 1823.45,
"total_fees": 18.23,
"net_pnl": 1805.22,
"win_rate": 0.634,
"avg_pnl_per_trade": 12.71,
"best_trade": 280.50,
"worst_trade": -95.30
}
"""
since = datetime.now(timezone.utc) - timedelta(days=days)
pipeline = [
# Stage 1: Filter to this agent's recent trades
{"$match": {
"agent_id": agent_id,
"timestamp": {"$gte": since}
}},
# Stage 2: Group by strategy with computed metrics
{"$group": {
"_id": {"$ifNull": ["$strategy", "unclassified"]},
"trade_count": {"$sum": 1},
"gross_pnl": {"$sum": "$pnl_usdc"},
"total_fees": {"$sum": {"$ifNull": ["$fee_usdc", 0]}},
"winning_trades": {"$sum": {"$cond": [{"$gt": ["$pnl_usdc", 0]}, 1, 0]}},
"best_trade": {"$max": "$pnl_usdc"},
"worst_trade": {"$min": "$pnl_usdc"},
"pnl_variance_sum": {"$sum": {"$pow": ["$pnl_usdc", 2]}}
}},
# Stage 3: Compute derived fields
{"$project": {
"strategy": "$_id",
"trade_count": 1,
"gross_pnl": {"$round": ["$gross_pnl", 2]},
"total_fees": {"$round": ["$total_fees", 2]},
"net_pnl": {"$round": [{"$subtract": ["$gross_pnl", "$total_fees"]}, 2]},
"win_rate": {"$round": [{"$divide": ["$winning_trades", "$trade_count"]}, 4]},
"avg_pnl_per_trade": {"$round": [{"$divide": ["$gross_pnl", "$trade_count"]}, 2]},
"best_trade": {"$round": ["$best_trade", 2]},
"worst_trade": {"$round": ["$worst_trade", 2]},
"_id": 0
}},
# Stage 4: Sort by net P&L descending
{"$sort": {"net_pnl": -1}}
]
return list(trades.aggregate(pipeline, allowDiskUse=True))
def get_daily_equity_curve(agent_id: str, days: int = 90) -> list[dict]:
"""
Compute a daily equity curve (cumulative P&L) for an agent.
Returns [{date: "2026-03-07", daily_pnl: 45.23, cumulative_pnl: 823.10}, ...]
"""
since = datetime.now(timezone.utc) - timedelta(days=days)
pipeline = [
{"$match": {"agent_id": agent_id, "timestamp": {"$gte": since}}},
# Truncate timestamp to day
{"$group": {
"_id": {
"year": {"$year": "$timestamp"},
"month": {"$month": "$timestamp"},
"day": {"$dayOfMonth": "$timestamp"}
},
"daily_pnl": {"$sum": "$pnl_usdc"}
}},
{"$sort": {"_id.year": 1, "_id.month": 1, "_id.day": 1}},
# Running cumulative sum using $setWindowFields (MongoDB 5.0+)
{"$setWindowFields": {
"sortBy": {"_id": 1},
"output": {
"cumulative_pnl": {
"$sum": "$daily_pnl",
"window": {"documents": ["unbounded", "current"]}
}
}
}},
{"$project": {
"_id": 0,
"date": {
"$dateToString": {
"format": "%Y-%m-%d",
"date": {
"$dateFromParts": {
"year": "$_id.year",
"month": "$_id.month",
"day": "$_id.day"
}
}
}
},
"daily_pnl": {"$round": ["$daily_pnl", 2]},
"cumulative_pnl": {"$round": ["$cumulative_pnl", 2]}
}}
]
return list(trades.aggregate(pipeline))
def get_sharpe_ratio(agent_id: str, days: int = 30, risk_free_rate: float = 0.05) -> float:
"""
Compute approximate annualised Sharpe ratio from daily P&L data.
Uses daily returns relative to starting equity.
"""
equity_curve = get_daily_equity_curve(agent_id, days)
if len(equity_curve) < 10:
return float("nan")
import statistics
daily_returns = [day["daily_pnl"] for day in equity_curve]
mean_return = statistics.mean(daily_returns)
std_return = statistics.stdev(daily_returns)
daily_rfr = risk_free_rate / 252
if std_return == 0:
return float("inf")
sharpe = (mean_return - daily_rfr) / std_return
return round(sharpe * (252 ** 0.5), 4) # annualise
Casino Win Rate by Multiplier Tier
For agents playing Purple Flea's casino, this pipeline buckets crash game outcomes by the cashout multiplier tier and computes EV per tier:
def get_casino_stats_by_multiplier_tier(agent_id: str) -> list[dict]:
"""
Group casino trades into multiplier tiers:
[1x-2x), [2x-5x), [5x-10x), [10x+)
Report count, total bet, total payout, and ROI per tier.
"""
pipeline = [
{"$match": {
"agent_id": agent_id,
"game_type": {"$in": ["crash", "coin_flip"]}
}},
{"$addFields": {
"tier": {
"$switch": {
"branches": [
{"case": {"$lt": ["$cashout_multiplier", 2]}, "then": "1x-2x"},
{"case": {"$lt": ["$cashout_multiplier", 5]}, "then": "2x-5x"},
{"case": {"$lt": ["$cashout_multiplier", 10]}, "then": "5x-10x"},
],
"default": "10x+"
}
}
}},
{"$group": {
"_id": "$tier",
"count": {"$sum": 1},
"total_bet": {"$sum": "$bet_usdc"},
"total_payout":{"$sum": {"$add": ["$bet_usdc", "$pnl_usdc"]}},
"total_pnl": {"$sum": "$pnl_usdc"}
}},
{"$project": {
"tier": "$_id",
"count": 1,
"total_bet": {"$round": ["$total_bet", 2]},
"total_pnl": {"$round": ["$total_pnl", 2]},
"roi_pct": {"$round": [
{"$multiply": [{"$divide": ["$total_pnl", "$total_bet"]}, 100]}, 2
]},
"_id": 0
}},
{"$sort": {"total_bet": -1}}
]
return list(db["trades"].aggregate(pipeline))
Change Streams to React to Wallet Balance Updates
MongoDB Change Streams give you a real-time event feed of database mutations — insertions, updates, and deletions — without polling. For AI agents, this means you can react to wallet balance changes, escrow state transitions, and incoming payments the moment they are recorded.
Watching for Incoming USDC Payments
# change_streams.py — Real-time agent wallet reactivity via MongoDB
import asyncio
from pymongo import MongoClient
from pymongo.errors import PyMongoError
client = MongoClient("mongodb+srv://your-cluster.mongodb.net/")
db = client["purple_flea_agents"]
async def watch_balance(
agent_id: str,
on_credit: callable,
on_debit: callable,
min_amount_usdc: float = 1.0
):
"""
Watch the wallet_events collection for balance changes on a specific
agent. Fires on_credit() for incoming transfers and on_debit() for
outgoing transfers above min_amount_usdc.
Purple Flea writes a wallet_event document whenever:
- The agent claims from the faucet
- An escrow payment is released to the agent
- The agent initiates a transfer
- The agent's casino balance changes
Args:
agent_id: The agent to watch
on_credit: async callable(event) for incoming funds
on_debit: async callable(event) for outgoing funds
min_amount_usdc: Ignore events below this threshold
"""
pipeline = [
{"$match": {
"operationType": {"$in": ["insert", "update"]},
"fullDocument.agent_id": agent_id,
"fullDocument.amount_usdc": {"$gte": min_amount_usdc}
}}
]
# Resume token allows the watcher to resume after a disconnect
resume_token = None
print(f"Watching wallet events for {agent_id}...")
while True:
try:
watch_kwargs = {
"pipeline": pipeline,
"full_document": "updateLookup",
}
if resume_token:
watch_kwargs["resume_after"] = resume_token
with db["wallet_events"].watch(**watch_kwargs) as stream:
for change in stream:
resume_token = change["_id"]
doc = change.get("fullDocument", {})
direction = doc.get("direction", "")
if direction == "receive":
await on_credit(doc)
elif direction == "send":
await on_debit(doc)
except PyMongoError as e:
print(f"Change stream error: {e}. Reconnecting in 5s...")
await asyncio.sleep(5)
# Example callbacks:
async def rebalance_on_credit(event: dict):
"""
When a credit arrives, check if we have enough to open a new position.
Called in real-time — no polling delay.
"""
amount = event["amount_usdc"]
agent_id = event["agent_id"]
print(f"[{agent_id}] Credit: +${amount:.2f} USDC via {event.get('source')}")
# Fetch current strategy state and decide whether to deploy the funds
# e.g., if balance > min_position_size, open a new trade
# This reaction happens within milliseconds of the DB write
async def log_on_debit(event: dict):
print(f"[{event['agent_id']}] Debit: -${event['amount_usdc']:.2f} USDC "
f"to {event.get('to_address', 'unknown')}")
# Example: Watch for escrow releases
async def watch_escrow_releases(agent_id: str, on_release: callable):
"""
Watch the escrow_events collection for payments released to this agent.
Fires on_release() when an escrow is settled in the agent's favour.
"""
pipeline = [
{"$match": {
"operationType": "update",
"updateDescription.updatedFields.status": "released",
"fullDocument.beneficiary_agent": agent_id
}}
]
with db["escrow_events"].watch(pipeline=pipeline,
full_document="updateLookup") as stream:
for change in stream:
doc = change["fullDocument"]
print(f"Escrow {doc['escrow_id']} released: "
f"${doc['amount_usdc']:.2f} USDC")
await on_release(doc)
# Run both watchers concurrently
async def main():
await asyncio.gather(
watch_balance(
agent_id="agent_7f9a3c",
on_credit=rebalance_on_credit,
on_debit=log_on_debit
),
watch_escrow_releases(
agent_id="agent_7f9a3c",
on_release=lambda e: print(f"Got paid: {e}")
)
)
if __name__ == "__main__":
asyncio.run(main())
Change Streams are only available on MongoDB replica sets or sharded clusters — not standalone instances. MongoDB Atlas provides replica sets by default on all tiers, including the free M0 tier. For production agents, use at least an M10 cluster to guarantee the oplog retention needed for stream resumption after disconnects.
AgentTradeRepository Class
The following repository class wraps the most common MongoDB operations for agent trade storage, P&L analysis, and balance watching into a clean, reusable interface that maps directly to Purple Flea's data model.
# agent_trade_repository.py
# MongoDB persistence layer for Purple Flea agent trade data.
# Requires: pip install pymongo motor
from motor.motor_asyncio import AsyncIOMotorClient # async MongoDB driver
from datetime import datetime, timezone, timedelta
from dataclasses import dataclass, asdict, field
from typing import AsyncIterator, Optional
import asyncio
@dataclass
class Trade:
agent_id: str
game_type: str # "crash" | "coin_flip" | "futures" | "spot" | "escrow"
pnl_usdc: float
timestamp: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
strategy: Optional[str] = None
bet_usdc: Optional[float] = None
fee_usdc: Optional[float] = None
metadata: Optional[dict] = None # strategy-specific fields
class AgentTradeRepository:
"""
MongoDB-backed repository for Purple Flea agent trade records.
Provides:
- save_trade(): persist a trade with automatic indexing
- get_pnl_by_strategy(): aggregated P&L breakdown
- watch_balance(): async generator of real-time wallet events
- get_recent_trades(): paginated trade history
- export_for_reporting(): 7-year retention compliant export
"""
def __init__(self, mongo_uri: str, db_name: str = "purple_flea"):
self._client = AsyncIOMotorClient(mongo_uri)
self._db = self._client[db_name]
self._trades = self._db["trades"]
self._wallets = self._db["wallet_events"]
self._ready = False
async def init(self):
"""Create indexes. Call once at startup."""
# Primary lookup pattern: agent + time-sorted
await self._trades.create_index(
[("agent_id", 1), ("timestamp", -1)],
name="agent_time"
)
# Strategy analytics
await self._trades.create_index(
[("agent_id", 1), ("strategy", 1), ("timestamp", -1)],
name="agent_strategy_time",
sparse=True
)
# Time-range queries across all agents (for fleet analytics)
await self._trades.create_index("timestamp", name="time_global")
self._ready = True
print("AgentTradeRepository indexes created.")
async def save_trade(self, trade: Trade) -> str:
"""
Persist a trade document. Returns the inserted document ID.
Documents with different strategies can have different fields
in the metadata dict — MongoDB stores them all without a schema change.
"""
doc = asdict(trade)
# Remove None values for clean documents (MongoDB doesn't need NULLs)
doc = {k: v for k, v in doc.items() if v is not None}
result = await self._trades.insert_one(doc)
return str(result.inserted_id)
async def get_pnl_by_strategy(
self,
agent_id: str,
days: int = 30,
) -> list[dict]:
"""
Aggregated P&L grouped by strategy for the past N days.
"""
since = datetime.now(timezone.utc) - timedelta(days=days)
pipeline = [
{"$match": {
"agent_id": agent_id,
"timestamp": {"$gte": since}
}},
{"$group": {
"_id": {"$ifNull": ["$strategy", "unclassified"]},
"trade_count":{"$sum": 1},
"gross_pnl": {"$sum": "$pnl_usdc"},
"total_fees": {"$sum": {"$ifNull": ["$fee_usdc", 0]}},
"wins": {"$sum": {"$cond": [{"$gt": ["$pnl_usdc", 0]}, 1, 0]}}
}},
{"$project": {
"strategy": "$_id",
"trade_count": 1,
"net_pnl": {"$round": [
{"$subtract": ["$gross_pnl", "$total_fees"]}, 2
]},
"win_rate": {"$round": [
{"$divide": ["$wins", "$trade_count"]}, 4
]},
"avg_trade": {"$round": [
{"$divide": ["$gross_pnl", "$trade_count"]}, 2
]},
"_id": 0
}},
{"$sort": {"net_pnl": -1}}
]
return await self._trades.aggregate(pipeline).to_list(length=None)
async def watch_balance(
self,
agent_id: str,
min_usdc: float = 0.01
) -> AsyncIterator[dict]:
"""
Async generator yielding wallet event documents as they arrive.
Uses MongoDB Change Streams for zero-latency reactivity.
Usage:
async for event in repo.watch_balance("agent_7f9a3c"):
print(f"Balance event: {event['direction']} ${event['amount_usdc']}")
"""
pipeline = [
{"$match": {
"operationType": {"$in": ["insert", "update"]},
"fullDocument.agent_id": agent_id,
"fullDocument.amount_usdc": {"$gte": min_usdc}
}}
]
async with self._wallets.watch(
pipeline, full_document="updateLookup"
) as stream:
async for change in stream:
yield change.get("fullDocument", {})
async def get_recent_trades(
self,
agent_id: str,
limit: int = 50,
skip: int = 0,
strategy: Optional[str] = None
) -> list[dict]:
"""Paginated trade history, most recent first."""
query: dict = {"agent_id": agent_id}
if strategy:
query["strategy"] = strategy
cursor = (
self._trades.find(query, {"_id": 0})
.sort("timestamp", -1)
.skip(skip)
.limit(limit)
)
return await cursor.to_list(length=limit)
async def export_for_reporting(
self,
agent_id: str,
start: datetime,
end: datetime,
output_path: str
) -> int:
"""
Export all trades in a date range to a JSON-Lines file for
regulatory record-keeping (7-year retention).
Returns number of records exported.
"""
import json
from pathlib import Path
cursor = self._trades.find(
{"agent_id": agent_id, "timestamp": {"$gte": start, "$lt": end}},
{"_id": 0}
).sort("timestamp", 1)
count = 0
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
async for doc in cursor:
# Convert datetime to ISO string for JSON serialisation
doc["timestamp"] = doc["timestamp"].isoformat()
f.write(json.dumps(doc) + "\n")
count += 1
print(f"Exported {count} trades to {output_path}")
return count
async def close(self):
self._client.close()
Time-Series Collections for OHLCV Market Data
MongoDB's native time-series collections provide automatic time-bucketing, lossless compression (up to 10x smaller than standard collections for sequential data), and granular TTL policies — ideal for storing the OHLCV candlestick data that agents use for technical analysis.
Creating a Time-Series Collection
# timeseries_setup.py — One-time setup for OHLCV time-series collections
from pymongo import MongoClient
client = MongoClient("mongodb+srv://your-cluster.mongodb.net/")
db = client["purple_flea_market_data"]
# Create a time-series collection with 90-day TTL
# MongoDB will automatically bucket and compress sequential documents
db.create_collection(
"ohlcv",
timeseries={
"timeField": "timestamp", # must be a BSON Date
"metaField": "symbol", # field used for partitioning
"granularity": "minutes" # "seconds" | "minutes" | "hours"
},
# Auto-delete candles older than 90 days (retain for backtesting window)
expireAfterSeconds=90 * 24 * 3600
)
# Create secondary index for fast per-symbol range queries
db["ohlcv"].create_index([("symbol", 1), ("timestamp", -1)])
print("Time-series OHLCV collection created with 90-day TTL.")
Writing and Querying OHLCV Data
# ohlcv_store.py — Ingest and query market data for AI agent strategies
from motor.motor_asyncio import AsyncIOMotorClient
from datetime import datetime, timezone, timedelta
import httpx, asyncio
class OHLCVStore:
"""
Time-series store for Purple Flea agent market data consumption.
Ingests candles from external price feeds and serves them to agents
for technical analysis and strategy signals.
"""
def __init__(self, mongo_uri: str):
client = AsyncIOMotorClient(mongo_uri)
self._col = client["purple_flea_market_data"]["ohlcv"]
async def insert_candle(
self,
symbol: str,
timestamp: datetime,
open_: float,
high: float,
low: float,
close: float,
volume: float
):
"""Insert a single OHLCV candle. Timestamp must be UTC."""
await self._col.insert_one({
"timestamp": timestamp, # timeField
"symbol": symbol, # metaField — used for partitioning
"o": open_,
"h": high,
"l": low,
"c": close,
"v": volume
})
async def insert_many_candles(self, candles: list[dict]):
"""Bulk insert — up to 10,000 candles at a time for efficiency."""
if candles:
await self._col.insert_many(candles)
async def get_candles(
self,
symbol: str,
since: datetime,
limit: int = 500
) -> list[dict]:
"""
Retrieve recent OHLCV candles, newest first.
Returns a list of {timestamp, o, h, l, c, v} dicts.
"""
cursor = (
self._col.find(
{"symbol": symbol, "timestamp": {"$gte": since}},
{"_id": 0, "symbol": 0}
)
.sort("timestamp", 1)
.limit(limit)
)
return await cursor.to_list(length=limit)
async def get_vwap(
self,
symbol: str,
hours: int = 24
) -> float:
"""
Compute Volume-Weighted Average Price over the last N hours
using an aggregation pipeline on the time-series collection.
"""
since = datetime.now(timezone.utc) - timedelta(hours=hours)
pipeline = [
{"$match": {
"symbol": symbol,
"timestamp": {"$gte": since}
}},
{"$group": {
"_id": None,
"sum_pv": {"$sum": {"$multiply": ["$c", "$v"]}},
"sum_v": {"$sum": "$v"}
}},
{"$project": {
"_id": 0,
"vwap": {"$cond": {
"if": {"$gt": ["$sum_v", 0]},
"then": {"$round": [{"$divide": ["$sum_pv", "$sum_v"]}, 4]},
"else": 0
}}
}}
]
result = await self._col.aggregate(pipeline).to_list(1)
return result[0]["vwap"] if result else 0.0
async def get_bollinger_bands(
self,
symbol: str,
period: int = 20,
std_devs: float = 2.0
) -> dict:
"""
Compute Bollinger Bands over the last `period` candles.
Returns {"upper": ..., "middle": ..., "lower": ...}
"""
since = datetime.now(timezone.utc) - timedelta(minutes=period + 5)
candles = await self.get_candles(symbol, since, limit=period)
closes = [c["c"] for c in candles[-period:]]
if len(closes) < period:
return {}
mean = sum(closes) / len(closes)
variance = sum((c - mean) ** 2 for c in closes) / len(closes)
std = variance ** 0.5
return {
"upper": round(mean + std_devs * std, 4),
"middle": round(mean, 4),
"lower": round(mean - std_devs * std, 4)
}
Multi-Agent Analytics Across a Fleet of 100+ Agents
When you run a fleet of 100 or more trading agents, the interesting questions shift from per-agent P&L to fleet-wide patterns: which strategies are working in the current regime? Which agents are correlated? Which ones are silently bleeding capital?
Fleet-Wide Strategy Performance
# fleet_analytics.py — Cross-agent analytics for Purple Flea fleets
from motor.motor_asyncio import AsyncIOMotorClient
from datetime import datetime, timezone, timedelta
client = AsyncIOMotorClient("mongodb+srv://your-cluster.mongodb.net/")
db = client["purple_flea_agents"]
trades = db["trades"]
async def get_top_performing_agents(
strategy: str = None,
days: int = 7,
top_n: int = 10
) -> list[dict]:
"""
Rank all agents by net P&L over the past N days.
Optionally filter to a specific strategy.
Returns the top N agents with their key metrics.
"""
since = datetime.now(timezone.utc) - timedelta(days=days)
match_stage: dict = {"timestamp": {"$gte": since}}
if strategy:
match_stage["strategy"] = strategy
pipeline = [
{"$match": match_stage},
{"$group": {
"_id": "$agent_id",
"net_pnl": {"$sum": "$pnl_usdc"},
"trade_count": {"$sum": 1},
"wins": {"$sum": {"$cond": [{"$gt": ["$pnl_usdc", 0]}, 1, 0]}}
}},
{"$project": {
"agent_id": "$_id",
"net_pnl": {"$round": ["$net_pnl", 2]},
"trade_count": 1,
"win_rate": {"$round": [
{"$divide": ["$wins", "$trade_count"]}, 4
]},
"_id": 0
}},
{"$sort": {"net_pnl": -1}},
{"$limit": top_n}
]
return await trades.aggregate(pipeline).to_list(length=top_n)
async def get_fleet_strategy_heatmap(days: int = 30) -> list[dict]:
"""
Build a strategy x metric heatmap across the entire fleet.
Shows which strategies are working at the population level.
Returns:
[
{
"strategy": "momentum",
"agent_count": 38,
"total_pnl": 4823.10,
"median_pnl_per_agent": 126.97,
"fleet_win_rate": 0.589
}, ...
]
"""
since = datetime.now(timezone.utc) - timedelta(days=days)
pipeline = [
{"$match": {"timestamp": {"$gte": since}}},
# Per-agent, per-strategy totals first
{"$group": {
"_id": {
"agent_id": "$agent_id",
"strategy": {"$ifNull": ["$strategy", "unclassified"]}
},
"agent_pnl": {"$sum": "$pnl_usdc"},
"trades": {"$sum": 1},
"wins": {"$sum": {"$cond": [{"$gt": ["$pnl_usdc", 0]}, 1, 0]}}
}},
# Then aggregate across agents per strategy
{"$group": {
"_id": "$_id.strategy",
"agent_count": {"$sum": 1},
"total_pnl": {"$sum": "$agent_pnl"},
"agent_pnls": {"$push": "$agent_pnl"},
"total_trades": {"$sum": "$trades"},
"total_wins": {"$sum": "$wins"}
}},
{"$project": {
"strategy": "$_id",
"agent_count": 1,
"total_pnl": {"$round": ["$total_pnl", 2]},
"fleet_win_rate":{"$round": [
{"$divide": ["$total_wins", "$total_trades"]}, 4
]},
# Approximate median using sorted array (works for smaller fleets)
"median_pnl_per_agent": {
"$round": [
{"$arrayElemAt": [
{"$sortArray": {"input": "$agent_pnls", "sortBy": 1}},
{"$floor": {"$divide": [{"$size": "$agent_pnls"}, 2]}}
]}, 2
]
},
"_id": 0
}},
{"$sort": {"total_pnl": -1}}
]
return await trades.aggregate(pipeline, allowDiskUse=True).to_list(length=None)
async def detect_correlated_agents(
agent_ids: list[str],
days: int = 14
) -> dict:
"""
Compute pairwise P&L correlation between a list of agents
using daily P&L series. Helps identify over-correlated strategies
that provide no diversification benefit.
Returns: {"agent_a|agent_b": pearson_r, ...}
"""
import statistics, itertools
since = datetime.now(timezone.utc) - timedelta(days=days)
# Fetch daily P&L for each agent
agent_daily_pnl: dict[str, dict] = {}
for agent_id in agent_ids:
pipeline = [
{"$match": {"agent_id": agent_id, "timestamp": {"$gte": since}}},
{"$group": {
"_id": {
"y": {"$year": "$timestamp"},
"m": {"$month": "$timestamp"},
"d": {"$dayOfMonth": "$timestamp"}
},
"daily_pnl": {"$sum": "$pnl_usdc"}
}},
{"$sort": {"_id.y": 1, "_id.m": 1, "_id.d": 1}}
]
days_data = await trades.aggregate(pipeline).to_list(length=None)
agent_daily_pnl[agent_id] = {
f"{r['_id']['y']}-{r['_id']['m']:02}-{r['_id']['d']:02}": r["daily_pnl"]
for r in days_data
}
# Compute Pearson r for each pair
correlations: dict[str, float] = {}
for a, b in itertools.combinations(agent_ids, 2):
dates = set(agent_daily_pnl[a]) & set(agent_daily_pnl[b])
if len(dates) < 5:
continue
xs = [agent_daily_pnl[a][d] for d in sorted(dates)]
ys = [agent_daily_pnl[b][d] for d in sorted(dates)]
try:
r = statistics.correlation(xs, ys)
correlations[f"{a}|{b}"] = round(r, 4)
except statistics.StatisticsError:
pass
return correlations
Atlas Search for Historically Similar Market Patterns
MongoDB Atlas Search enables full-text and vector search on your trade and market data documents. For AI agents, the most powerful application is finding historically similar market patterns — essentially a semantic search over your OHLCV and trade history corpus.
Vector Search for Similar Trade Setups
Atlas Vector Search stores embedding vectors alongside documents, enabling k-nearest-neighbour (kNN) similarity search. The workflow for agent pattern matching:
Embed each trade setup
When storing a trade, compute a vector embedding of the market conditions at entry: RSI, MACD signal, volume ratio, volatility regime, hour-of-day. Store the embedding in a setup_vector field.
Create an Atlas Vector Search index
Define a vector index on the setup_vector field using cosine similarity. Atlas will build the HNSW index automatically.
Query for similar historical setups
Before entering a new trade, embed the current market conditions and run a $vectorSearch query to find the 20 most similar historical setups and their outcomes.
Compute expected value from historical outcomes
Average the P&L of the top-k similar setups. If the historical EV is negative at a similarity threshold, skip the trade. If positive, size the position proportionally to the similarity score.
# atlas_search.py — Vector similarity search for agent trade setup matching
from motor.motor_asyncio import AsyncIOMotorClient
import numpy as np
client = AsyncIOMotorClient("mongodb+srv://your-cluster.mongodb.net/")
db = client["purple_flea_agents"]
trades = db["trades"]
def embed_market_state(
rsi_14: float,
macd_signal: float,
volume_ratio: float, # current / 20-day average
atr_pct: float, # ATR as % of price
trend_strength: float, # ADX
hour_of_day: int
) -> list[float]:
"""
Create a normalised feature vector representing the current market state.
In production, use an ML model (e.g., a trained MLP encoder) here.
"""
return [
rsi_14 / 100.0, # normalise to [0, 1]
(macd_signal + 5) / 10.0, # normalise assuming ±5 range
min(volume_ratio / 5.0, 1.0), # cap at 5x average
min(atr_pct / 5.0, 1.0), # cap at 5% ATR
trend_strength / 100.0,
hour_of_day / 23.0 # time-of-day feature
]
async def find_similar_setups(
current_state: list[float],
agent_id: str,
top_k: int = 20,
min_pnl_sample: int = 10
) -> dict:
"""
Find the most similar historical trade setups to the current market state
using Atlas Vector Search. Returns expected value and win rate from history.
Requires:
- An Atlas Vector Search index named "setup_vector_index" on the
"setup_vector" field with numDimensions=6, similarity="cosine"
- MongoDB Atlas cluster (not available on local deployments)
"""
pipeline = [
{"$vectorSearch": {
"index": "setup_vector_index",
"path": "setup_vector",
"queryVector": current_state,
"numCandidates": top_k * 10,
"limit": top_k,
"filter": {
"agent_id": agent_id
}
}},
{"$project": {
"_id": 0,
"pnl_usdc": 1,
"strategy": 1,
"timestamp": 1,
"score": {"$meta": "vectorSearchScore"}
}}
]
similar = await trades.aggregate(pipeline).to_list(length=top_k)
if len(similar) < min_pnl_sample:
return {"ev": None, "win_rate": None, "sample_size": len(similar)}
pnls = [s["pnl_usdc"] for s in similar]
ev = round(sum(pnls) / len(pnls), 4)
win_rate = round(sum(1 for p in pnls if p > 0) / len(pnls), 4)
avg_score = round(sum(s["score"] for s in similar) / len(similar), 4)
return {
"ev": ev,
"win_rate": win_rate,
"sample_size":len(similar),
"avg_similarity": avg_score,
"should_trade": ev > 0 and win_rate > 0.5 and avg_score > 0.85
}
async def save_trade_with_embedding(
trade_data: dict,
market_state_features: list[float]
):
"""
Persist a trade with its market-state embedding for future similarity search.
"""
trade_data["setup_vector"] = market_state_features
result = await trades.insert_one(trade_data)
return str(result.inserted_id)
Atlas Vector Search is available on MongoDB Atlas M10 and above. Create a vector index in the Atlas UI under Database → Search Indexes, or use the Atlas CLI: atlas clusters search indexes create --clusterName MyCluster --file index.json. The index definition JSON specifies type: "vectorSearch", the field path, number of dimensions, and similarity metric.
Deploy Your First MongoDB-Backed Agent
Purple Flea provides the financial API — casino, trading, wallet, escrow, faucet, and domains. MongoDB stores and analyzes every interaction. Together, they give AI agents a complete financial operating system with institutional-grade observability.