Use Redis as the memory layer for every financial decision your agents make. Cache live market data, coordinate trading signals over pub/sub, rate-limit Purple Flea API calls with token buckets, and maintain tamper-proof audit trails with Redis Streams.
AI agents interacting with financial APIs have unique memory requirements: sub-millisecond reads for hot market data, shared state across agent replicas, time-to-live expiry that mirrors market session windows, and persistent queues for audit compliance. Redis satisfies all of them.
Purple Flea's perpetual futures tick every 250ms. Caching with Redis means your agent never waits on a round-trip to the API for data that hasn't changed.
Run 10 agent instances against the same strategy. Redis gives every replica a consistent view of open positions, wallet balances, and risk limits without race conditions.
Set expiry aligned to market sessions. Order book snapshots expire in 500ms; daily P&L summaries persist for 24h. Redis TTL handles this natively.
Use INCR and DECRBY to maintain per-agent drawdown counters atomically. No locks needed, no lost updates across concurrent agent decisions.
A signal agent publishes a trade trigger to a channel. All subscriber execution agents receive it in under 1ms and act independently without polling.
Every Purple Flea API call appended to a Redis Stream creates an immutable, sequential record of agent behavior — queryable by time range without a database.
The RedisAgentCache
class wraps every Purple Flea API call with a read-through cache. The agent never calls
the API twice for the same data within the TTL window.
import redis import json import time import httpx from typing import Optional, Dict, Any class RedisAgentCache: """ Read-through cache for Purple Flea API calls. All hot data lives in Redis; cold paths hit the API. """ def __init__(self, redis_url: str, api_key: str, agent_id: str): self.r = redis.from_url(redis_url, decode_responses=True) self.api_key = api_key # e.g. pf_live_yourkey self.agent_id = agent_id self.base = "https://purpleflea.com/api" self.http = httpx.Client( headers={"Authorization": f"Bearer {api_key}"}, timeout=10 ) # ── Price Cache ────────────────────────────────────────────────────────── def get_price(self, market: str, ttl: int = 1) -> Dict[str, Any]: """ Return cached price for `market` (e.g. 'BTC-PERP'). Fetches from Purple Flea trading API if cache miss. TTL=1s by default — aligns with tick frequency. """ cache_key = f"price:{market}" cached = self.r.get(cache_key) if cached: data = json.loads(cached) data["_cache"] = "hit" return data # Cache miss — fetch from Purple Flea resp = self.http.get(f"{self.base}/trading/markets/{market}/ticker") resp.raise_for_status() data = resp.json() self.r.setex(cache_key, ttl, json.dumps(data)) data["_cache"] = "miss" return data # ── Balance Cache ───────────────────────────────────────────────────────── def cache_balance(self, wallet_address: str, ttl: int = 30) -> Dict[str, Any]: """ Cache wallet balance. TTL=30s balances freshness against API rate limits. Uses hash for O(1) field reads without deserializing the full object. """ cache_key = f"balance:{wallet_address}" cached_hash = self.r.hgetall(cache_key) if cached_hash: return { "address": wallet_address, "usdc": float(cached_hash["usdc"]), "btc": float(cached_hash["btc"]), "eth": float(cached_hash["eth"]), "cached_at": cached_hash["cached_at"], "_cache": "hit", } resp = self.http.get(f"{self.base}/wallet/balance/{wallet_address}") resp.raise_for_status() data = resp.json() pipe = self.r.pipeline() pipe.hset(cache_key, mapping={ "usdc": str(data["usdc"]), "btc": str(data.get("btc", 0)), "eth": str(data.get("eth", 0)), "cached_at": str(time.time()), }) pipe.expire(cache_key, ttl) pipe.execute() data["_cache"] = "miss" return data # ── Position State ──────────────────────────────────────────────────────── def set_position(self, market: str, side: str, size: float, entry_price: float, ttl: int = 3600): """Store open position state. Expires after 1h (session-length TTL).""" key = f"position:{self.agent_id}:{market}" self.r.hset(key, mapping={ "side": side, "size": str(size), "entry_price": str(entry_price), "opened_at": str(time.time()), }) self.r.expire(key, ttl) def get_position(self, market: str) -> Optional[Dict]: key = f"position:{self.agent_id}:{market}" data = self.r.hgetall(key) return data if data else None def clear_position(self, market: str): self.r.delete(f"position:{self.agent_id}:{market}")
In a multi-agent system, a signal agent analyzes markets and publishes triggers. Execution agents subscribe and act instantly. Redis pub/sub delivers messages in under 1ms on localhost and under 5ms on the same data center network.
import redis, json, time r = redis.from_url("redis://localhost:6379") def publish_signal( market: str, signal_type: str, # "LONG" | "SHORT" | "CLOSE" confidence: float, # 0.0 – 1.0 source: str = "momentum" ): """ Publish trade signal to all subscribed execution agents. Channel: pf:signals:{market} """ channel = f"pf:signals:{market}" payload = json.dumps({ "market": market, "type": signal_type, "confidence": confidence, "source": source, "ts": time.time(), }) subscribers = r.publish(channel, payload) return subscribers # number of agents reached # Example: BTC momentum signal if btc_rsi > 70 and volume_spike: n = publish_signal( market="BTC-PERP", signal_type="LONG", confidence=0.82, source="rsi+volume" ) print(f"Signal sent to {n} agents")
import redis, json, httpx r = redis.from_url("redis://localhost:6379") pubsub = r.pubsub() pubsub.subscribe( "pf:signals:BTC-PERP", "pf:signals:ETH-PERP", "pf:signals:SOL-PERP" ) http = httpx.Client(headers={ "Authorization": "Bearer pf_live_yourkey" }) for message in pubsub.listen(): if message["type"] != "message": continue sig = json.loads(message["data"]) # Only act on high-confidence signals if sig["confidence"] < 0.75: continue # Execute order on Purple Flea resp = http.post( "https://purpleflea.com/api/trading/order", json={ "market": sig["market"], "side": sig["type"], "size": 0.01, "type": "market", } ) print(f"Executed {sig['type']} on {sig['market']}")
Use pf:signals:{market} for market signals, pf:alerts:{agent_id} for per-agent alerts, and pf:broadcast for system-wide messages.
One signal agent can coordinate a fleet of 100+ execution agents in milliseconds. Each subscriber acts independently — no shared lock contention.
Pub/Sub is ephemeral — subscribers offline miss messages. For guaranteed delivery and replay, use Redis Streams (covered below) alongside pub/sub.
Purple Flea enforces per-key rate limits. Running multiple agents on the same key without coordination will result in 429 errors and missed opportunities. A Redis token bucket ensures every agent respects limits atomically, without any central coordinator process.
import redis import time r = redis.from_url("redis://localhost:6379") # Lua script — runs atomically on Redis server (no race conditions) TOKEN_BUCKET_SCRIPT = """ local key = KEYS[1] local capacity = tonumber(ARGV[1]) -- max tokens local refill_rate = tonumber(ARGV[2]) -- tokens per second local now = tonumber(ARGV[3]) -- current time (ms) local requested = tonumber(ARGV[4]) -- tokens needed local bucket = redis.call('HMGET', key, 'tokens', 'last_refill') local tokens = tonumber(bucket[1]) or capacity local last_refill = tonumber(bucket[2]) or now -- Refill based on elapsed time local elapsed = (now - last_refill) / 1000 local new_tokens = math.min(capacity, tokens + elapsed * refill_rate) if new_tokens >= requested then new_tokens = new_tokens - requested redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill', now) redis.call('EXPIRE', key, 3600) return 1 -- allowed else redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill', now) redis.call('EXPIRE', key, 3600) return 0 -- denied end """ bucket_script = r.register_script(TOKEN_BUCKET_SCRIPT) def acquire_token(api_key_hash: str, capacity: int = 60, refill_rate: float = 1.0) -> bool: """ Try to consume one API token. capacity=60: burst up to 60 calls refill_rate=1.0: refill 1 token/second (60 RPM steady state) Returns True if the call is allowed. """ key = f"rate_limit:{api_key_hash}" now_ms = int(time.time() * 1000) result = bucket_script( keys=[key], args=[capacity, refill_rate, now_ms, 1] ) return bool(result) # Usage across all agents sharing one API key def safe_api_call(endpoint: str, payload: dict, api_key: str): key_hash = api_key[-12:] # last 12 chars as bucket key if not acquire_token(key_hash): raise Exception("Rate limit: wait for token refill") # ... proceed with API call
The check-and-decrement operation must be atomic. Without Lua, two agents could both read "5 tokens remaining" and both consume one, leaving the count at 4 instead of 3. Redis executes Lua scripts as a single atomic unit.
Use rate_limit:{api_key_hash} for a shared bucket (all agents on the key share quota) or rate_limit:{api_key_hash}:{agent_id} for per-agent sub-limits.
Every Purple Flea API call your agent makes — bets placed, withdrawals triggered, escrow deposits — should be appended to a Redis Stream. Streams are append-only, ordered by server time, and can be replayed from any point. This gives you a queryable audit trail without a database.
import redis, time, json r = redis.from_url("redis://localhost:6379") STREAM_KEY = "pf:audit" MAX_STREAM_LEN = 100_000 # ~1M events before trimming oldest def log_transaction( agent_id: str, action: str, # "bet" | "withdraw" | "escrow_deposit" | "order" market: str, amount: float, result: dict, # API response from Purple Flea metadata: dict = None ) -> str: """ Append transaction to Redis Stream. Returns the stream entry ID (timestamp-sequence, e.g. '1709812345678-0'). Use MAXLEN to cap stream size automatically. """ entry = { "agent_id": agent_id, "action": action, "market": market, "amount": str(amount), "status": result.get("status", "unknown"), "tx_id": result.get("tx_id", ""), "pnl": str(result.get("pnl", 0)), "wall_time": str(time.time()), "meta": json.dumps(metadata or {}), } entry_id = r.xadd(STREAM_KEY, entry, maxlen=MAX_STREAM_LEN, approximate=True) return entry_id.decode() def replay_transactions( agent_id: str = None, start: str = "0", # stream ID or "-" for beginning count: int = 500 ) -> list: """Read audit log entries, optionally filtering by agent.""" entries = r.xrange(STREAM_KEY, min=start, count=count) results = [] for entry_id, fields in entries: if agent_id and fields.get("agent_id") != agent_id: continue results.append({"id": entry_id, **fields}) return results def get_agent_pnl_from_stream(agent_id: str) -> float: """Calculate total P&L by replaying the agent's stream entries.""" entries = replay_transactions(agent_id=agent_id, count=10_000) return sum(float(e["pnl"]) for e in entries) # Example: log a Purple Flea casino bet entry_id = log_transaction( agent_id="agent-007", action="bet", market="coin-flip", amount=0.10, result={"status": "win", "pnl": 0.09, "tx_id": "cf-9284"}, metadata={"prediction": "heads", "model": "claude-opus-4"} ) print(f"Logged: {entry_id}")
| Field | Type | Description | Example |
|---|---|---|---|
agent_id |
string | Agent identifier for filtering | agent-007 |
action |
enum | Purple Flea operation type | bet, order, escrow_deposit |
market |
string | Market or game identifier | BTC-PERP, coin-flip |
amount |
float | USDC amount involved | 0.10 |
status |
enum | Outcome from Purple Flea API | win, loss, filled |
pnl |
float | Profit/loss for this event | 0.09 |
tx_id |
string | Purple Flea transaction ID | cf-9284 |
wall_time |
float | Unix timestamp of event | 1709812345.678 |
meta |
JSON | Arbitrary strategy metadata | {"model": "claude-opus-4"} |
A trading agent that runs for hours needs session state that survives process restarts, maintains a risk budget across its lifetime, and can be paused or resumed. Redis is the ideal session store for this pattern.
session:*import redis, json, time r = redis.from_url("redis://localhost:6379") class AgentSession: def __init__(self, agent_id: str, start_capital: float, max_drawdown: float = 0.05, session_ttl: int = 86400): self.key = f"session:{agent_id}" self.hb_key = f"heartbeat:{agent_id}" self.session_ttl = session_ttl # SETNX: only create if not exists created = r.hsetnx( self.key, "start_capital", str(start_capital) ) if created: r.hset(self.key, mapping={ "current_capital": str(start_capital), "max_drawdown": str(max_drawdown), "total_pnl": "0", "trades": "0", "started_at": str(time.time()), "status": "active", }) r.expire(self.key, session_ttl) def heartbeat(self, interval: int = 30): """Reset 30s TTL — signals agent is alive.""" r.setex(self.hb_key, interval, "alive") def record_trade(self, pnl: float): with r.pipeline() as pipe: pipe.hincrbyfloat(self.key, "total_pnl", pnl) pipe.hincrbyfloat(self.key, "current_capital", pnl) pipe.hincrby(self.key, "trades", 1) pipe.execute() def is_over_drawdown(self) -> bool: data = r.hmget(self.key, "start_capital", "current_capital", "max_drawdown") start, current, limit = ( float(x) for x in data) dd = (start - current) / start return dd >= limit
Start with a free faucet claim to get funds, spin up Redis locally, and have your first cached agent call working in minutes.
Register your agent at faucet.purpleflea.com to receive free USDC for testing. No deposit required.
docker run -d -p 6379:6379 --name pf-redis redis:7-alpine — or use Redis Cloud free tier.
Set REDIS_URL=redis://localhost:6379 in your environment.
pip install redis httpx — that's all you need. No ORM, no heavy framework.
Instantiate RedisAgentCache(redis_url, api_key, agent_id) and call
get_price("BTC-PERP"). Verify the first call returns _cache: "miss"
and the second returns _cache: "hit".
Wrap every Purple Flea API response with log_transaction().
Run redis-cli XLEN pf:audit to confirm entries are appending correctly.
Wrap all API calls with acquire_token(). Tune capacity and
refill_rate to match your Purple Flea plan limits.
Cache both sides of the order book in a Redis sorted set. Update quotes every tick from pub/sub price feeds. Track inventory risk in a hash.
Cache recent game outcomes in a Redis list (LPUSH/LTRIM to maintain a rolling window). Apply martingale or Kelly criterion to next bet size.
Pub/sub channel notifies all parties when an escrow deposit is confirmed. Shared state in Redis tracks multi-party agreement status atomically.
Signal agent detects price divergence and publishes to a channel. Multiple execution agents simultaneously take both sides of the spread.
Cache balances with 30s TTL. Publish alerts to pf:alerts:{agent_id}
when balance drops below minimum threshold.
Store historical OHLCV snapshots in Redis sorted sets keyed by timestamp. Replay against live strategy logic using ZRANGEBYSCORE queries.