Redis Integration Guide

Redis-Powered AI Agents
That Trade

Use Redis as the memory layer for every financial decision your agents make. Cache live market data, coordinate trading signals over pub/sub, rate-limit Purple Flea API calls with token buckets, and maintain tamper-proof audit trails with Redis Streams.

< 1msRedis read latency
6Purple Flea services
137+Live trading agents
pf_live_API key prefix

Why Redis Is the Natural Memory Layer for Trading Agents

AI agents interacting with financial APIs have unique memory requirements: sub-millisecond reads for hot market data, shared state across agent replicas, time-to-live expiry that mirrors market session windows, and persistent queues for audit compliance. Redis satisfies all of them.

Sub-millisecond Price Reads

Purple Flea's perpetual futures tick every 250ms. Caching with Redis means your agent never waits on a round-trip to the API for data that hasn't changed.

🔁

Shared State Across Replicas

Run 10 agent instances against the same strategy. Redis gives every replica a consistent view of open positions, wallet balances, and risk limits without race conditions.

🕐

TTL-Aware Data Lifecycle

Set expiry aligned to market sessions. Order book snapshots expire in 500ms; daily P&L summaries persist for 24h. Redis TTL handles this natively.

🔒

Atomic Operations for Risk Limits

Use INCR and DECRBY to maintain per-agent drawdown counters atomically. No locks needed, no lost updates across concurrent agent decisions.

📱

Pub/Sub Signal Broadcasting

A signal agent publishes a trade trigger to a channel. All subscriber execution agents receive it in under 1ms and act independently without polling.

📄

Streams for Compliance

Every Purple Flea API call appended to a Redis Stream creates an immutable, sequential record of agent behavior — queryable by time range without a database.

Redis as Agent Memory: Market Data, Positions, Balances

The RedisAgentCache class wraps every Purple Flea API call with a read-through cache. The agent never calls the API twice for the same data within the TTL window.

redis_agent_cache.py
Python
import redis
import json
import time
import httpx
from typing import Optional, Dict, Any

class RedisAgentCache:
    """
    Read-through cache for Purple Flea API calls.
    All hot data lives in Redis; cold paths hit the API.
    """

    def __init__(self, redis_url: str, api_key: str, agent_id: str):
        self.r = redis.from_url(redis_url, decode_responses=True)
        self.api_key = api_key            # e.g. pf_live_yourkey
        self.agent_id = agent_id
        self.base = "https://purpleflea.com/api"
        self.http = httpx.Client(
            headers={"Authorization": f"Bearer {api_key}"},
            timeout=10
        )

    # ── Price Cache ──────────────────────────────────────────────────────────

    def get_price(self, market: str, ttl: int = 1) -> Dict[str, Any]:
        """
        Return cached price for `market` (e.g. 'BTC-PERP').
        Fetches from Purple Flea trading API if cache miss.
        TTL=1s by default — aligns with tick frequency.
        """
        cache_key = f"price:{market}"
        cached = self.r.get(cache_key)

        if cached:
            data = json.loads(cached)
            data["_cache"] = "hit"
            return data

        # Cache miss — fetch from Purple Flea
        resp = self.http.get(f"{self.base}/trading/markets/{market}/ticker")
        resp.raise_for_status()
        data = resp.json()

        self.r.setex(cache_key, ttl, json.dumps(data))
        data["_cache"] = "miss"
        return data

    # ── Balance Cache ─────────────────────────────────────────────────────────

    def cache_balance(self, wallet_address: str, ttl: int = 30) -> Dict[str, Any]:
        """
        Cache wallet balance. TTL=30s balances freshness against API rate limits.
        Uses hash for O(1) field reads without deserializing the full object.
        """
        cache_key = f"balance:{wallet_address}"
        cached_hash = self.r.hgetall(cache_key)

        if cached_hash:
            return {
                "address": wallet_address,
                "usdc": float(cached_hash["usdc"]),
                "btc": float(cached_hash["btc"]),
                "eth": float(cached_hash["eth"]),
                "cached_at": cached_hash["cached_at"],
                "_cache": "hit",
            }

        resp = self.http.get(f"{self.base}/wallet/balance/{wallet_address}")
        resp.raise_for_status()
        data = resp.json()

        pipe = self.r.pipeline()
        pipe.hset(cache_key, mapping={
            "usdc": str(data["usdc"]),
            "btc": str(data.get("btc", 0)),
            "eth": str(data.get("eth", 0)),
            "cached_at": str(time.time()),
        })
        pipe.expire(cache_key, ttl)
        pipe.execute()

        data["_cache"] = "miss"
        return data

    # ── Position State ────────────────────────────────────────────────────────

    def set_position(self, market: str, side: str, size: float,
                      entry_price: float, ttl: int = 3600):
        """Store open position state. Expires after 1h (session-length TTL)."""
        key = f"position:{self.agent_id}:{market}"
        self.r.hset(key, mapping={
            "side": side,
            "size": str(size),
            "entry_price": str(entry_price),
            "opened_at": str(time.time()),
        })
        self.r.expire(key, ttl)

    def get_position(self, market: str) -> Optional[Dict]:
        key = f"position:{self.agent_id}:{market}"
        data = self.r.hgetall(key)
        return data if data else None

    def clear_position(self, market: str):
        self.r.delete(f"position:{self.agent_id}:{market}")

Real-Time Trade Signals Between Agents

In a multi-agent system, a signal agent analyzes markets and publishes triggers. Execution agents subscribe and act instantly. Redis pub/sub delivers messages in under 1ms on localhost and under 5ms on the same data center network.

Signal Publisher (Analyst Agent)

signal_publisher.py
Python
import redis, json, time

r = redis.from_url("redis://localhost:6379")

def publish_signal(
    market: str,
    signal_type: str,    # "LONG" | "SHORT" | "CLOSE"
    confidence: float,   # 0.0 – 1.0
    source: str = "momentum"
):
    """
    Publish trade signal to all subscribed execution
    agents. Channel: pf:signals:{market}
    """
    channel = f"pf:signals:{market}"
    payload = json.dumps({
        "market": market,
        "type": signal_type,
        "confidence": confidence,
        "source": source,
        "ts": time.time(),
    })
    subscribers = r.publish(channel, payload)
    return subscribers   # number of agents reached

# Example: BTC momentum signal
if btc_rsi > 70 and volume_spike:
    n = publish_signal(
        market="BTC-PERP",
        signal_type="LONG",
        confidence=0.82,
        source="rsi+volume"
    )
    print(f"Signal sent to {n} agents")

Signal Subscriber (Execution Agent)

signal_subscriber.py
Python
import redis, json, httpx

r = redis.from_url("redis://localhost:6379")
pubsub = r.pubsub()
pubsub.subscribe(
    "pf:signals:BTC-PERP",
    "pf:signals:ETH-PERP",
    "pf:signals:SOL-PERP"
)

http = httpx.Client(headers={
    "Authorization": "Bearer pf_live_yourkey"
})

for message in pubsub.listen():
    if message["type"] != "message":
        continue
    sig = json.loads(message["data"])

    # Only act on high-confidence signals
    if sig["confidence"] < 0.75:
        continue

    # Execute order on Purple Flea
    resp = http.post(
        "https://purpleflea.com/api/trading/order",
        json={
            "market": sig["market"],
            "side": sig["type"],
            "size": 0.01,
            "type": "market",
        }
    )
    print(f"Executed {sig['type']} on {sig['market']}")
🔌

Channel Naming Convention

Use pf:signals:{market} for market signals, pf:alerts:{agent_id} for per-agent alerts, and pf:broadcast for system-wide messages.

👥

Fan-Out Coordination

One signal agent can coordinate a fleet of 100+ execution agents in milliseconds. Each subscriber acts independently — no shared lock contention.

🔥

Fire and Forget vs. Streams

Pub/Sub is ephemeral — subscribers offline miss messages. For guaranteed delivery and replay, use Redis Streams (covered below) alongside pub/sub.

Token Bucket Rate Limiting for Purple Flea API Calls

Purple Flea enforces per-key rate limits. Running multiple agents on the same key without coordination will result in 429 errors and missed opportunities. A Redis token bucket ensures every agent respects limits atomically, without any central coordinator process.

rate_limiter.py
Python (Lua script)
import redis
import time

r = redis.from_url("redis://localhost:6379")

# Lua script — runs atomically on Redis server (no race conditions)
TOKEN_BUCKET_SCRIPT = """
local key        = KEYS[1]
local capacity   = tonumber(ARGV[1])   -- max tokens
local refill_rate = tonumber(ARGV[2])  -- tokens per second
local now        = tonumber(ARGV[3])   -- current time (ms)
local requested  = tonumber(ARGV[4])   -- tokens needed

local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1]) or capacity
local last_refill = tonumber(bucket[2]) or now

-- Refill based on elapsed time
local elapsed = (now - last_refill) / 1000
local new_tokens = math.min(capacity, tokens + elapsed * refill_rate)

if new_tokens >= requested then
    new_tokens = new_tokens - requested
    redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill', now)
    redis.call('EXPIRE', key, 3600)
    return 1    -- allowed
else
    redis.call('HMSET', key, 'tokens', new_tokens, 'last_refill', now)
    redis.call('EXPIRE', key, 3600)
    return 0    -- denied
end
"""

bucket_script = r.register_script(TOKEN_BUCKET_SCRIPT)

def acquire_token(api_key_hash: str, capacity: int = 60,
                   refill_rate: float = 1.0) -> bool:
    """
    Try to consume one API token.
    capacity=60: burst up to 60 calls
    refill_rate=1.0: refill 1 token/second (60 RPM steady state)
    Returns True if the call is allowed.
    """
    key = f"rate_limit:{api_key_hash}"
    now_ms = int(time.time() * 1000)
    result = bucket_script(
        keys=[key],
        args=[capacity, refill_rate, now_ms, 1]
    )
    return bool(result)

# Usage across all agents sharing one API key
def safe_api_call(endpoint: str, payload: dict, api_key: str):
    key_hash = api_key[-12:]          # last 12 chars as bucket key
    if not acquire_token(key_hash):
        raise Exception("Rate limit: wait for token refill")
    # ... proceed with API call

Why Lua for Atomicity

The check-and-decrement operation must be atomic. Without Lua, two agents could both read "5 tokens remaining" and both consume one, leaving the count at 4 instead of 3. Redis executes Lua scripts as a single atomic unit.

Per-Agent vs. Shared Buckets

Use rate_limit:{api_key_hash} for a shared bucket (all agents on the key share quota) or rate_limit:{api_key_hash}:{agent_id} for per-agent sub-limits.

Redis Streams for Agent Transaction Audit Logs

Every Purple Flea API call your agent makes — bets placed, withdrawals triggered, escrow deposits — should be appended to a Redis Stream. Streams are append-only, ordered by server time, and can be replayed from any point. This gives you a queryable audit trail without a database.

audit_stream.py
Python
import redis, time, json

r = redis.from_url("redis://localhost:6379")

STREAM_KEY = "pf:audit"
MAX_STREAM_LEN = 100_000   # ~1M events before trimming oldest

def log_transaction(
    agent_id: str,
    action: str,        # "bet" | "withdraw" | "escrow_deposit" | "order"
    market: str,
    amount: float,
    result: dict,       # API response from Purple Flea
    metadata: dict = None
) -> str:
    """
    Append transaction to Redis Stream.
    Returns the stream entry ID (timestamp-sequence, e.g. '1709812345678-0').
    Use MAXLEN to cap stream size automatically.
    """
    entry = {
        "agent_id": agent_id,
        "action": action,
        "market": market,
        "amount": str(amount),
        "status": result.get("status", "unknown"),
        "tx_id": result.get("tx_id", ""),
        "pnl": str(result.get("pnl", 0)),
        "wall_time": str(time.time()),
        "meta": json.dumps(metadata or {}),
    }
    entry_id = r.xadd(STREAM_KEY, entry, maxlen=MAX_STREAM_LEN, approximate=True)
    return entry_id.decode()

def replay_transactions(
    agent_id: str = None,
    start: str = "0",       # stream ID or "-" for beginning
    count: int = 500
) -> list:
    """Read audit log entries, optionally filtering by agent."""
    entries = r.xrange(STREAM_KEY, min=start, count=count)
    results = []
    for entry_id, fields in entries:
        if agent_id and fields.get("agent_id") != agent_id:
            continue
        results.append({"id": entry_id, **fields})
    return results

def get_agent_pnl_from_stream(agent_id: str) -> float:
    """Calculate total P&L by replaying the agent's stream entries."""
    entries = replay_transactions(agent_id=agent_id, count=10_000)
    return sum(float(e["pnl"]) for e in entries)

# Example: log a Purple Flea casino bet
entry_id = log_transaction(
    agent_id="agent-007",
    action="bet",
    market="coin-flip",
    amount=0.10,
    result={"status": "win", "pnl": 0.09, "tx_id": "cf-9284"},
    metadata={"prediction": "heads", "model": "claude-opus-4"}
)
print(f"Logged: {entry_id}")

Stream Field Schema

Field Type Description Example
agent_id string Agent identifier for filtering agent-007
action enum Purple Flea operation type bet, order, escrow_deposit
market string Market or game identifier BTC-PERP, coin-flip
amount float USDC amount involved 0.10
status enum Outcome from Purple Flea API win, loss, filled
pnl float Profit/loss for this event 0.09
tx_id string Purple Flea transaction ID cf-9284
wall_time float Unix timestamp of event 1709812345.678
meta JSON Arbitrary strategy metadata {"model": "claude-opus-4"}

Managing Long-Running Agent Trading Sessions

A trading agent that runs for hours needs session state that survives process restarts, maintains a risk budget across its lifetime, and can be paused or resumed. Redis is the ideal session store for this pattern.

  • Session object stores entry capital, current drawdown limit, and strategy parameters
  • Heartbeat key with short TTL detects crashed agents automatically
  • Idempotent session creation: SETNX ensures exactly one session per agent ID
  • Risk state (max_drawdown, daily_loss) updated atomically per trade
  • Session replay: load last state from Redis on restart without re-querying Purple Flea
  • Multi-session dashboard: SCAN for keys matching session:*
  • Session expiry aligns with market close (configurable TTL)
session_manager.py
Python
import redis, json, time

r = redis.from_url("redis://localhost:6379")

class AgentSession:
    def __init__(self, agent_id: str,
                 start_capital: float,
                 max_drawdown: float = 0.05,
                 session_ttl: int = 86400):
        self.key = f"session:{agent_id}"
        self.hb_key = f"heartbeat:{agent_id}"
        self.session_ttl = session_ttl

        # SETNX: only create if not exists
        created = r.hsetnx(
            self.key, "start_capital",
            str(start_capital)
        )
        if created:
            r.hset(self.key, mapping={
                "current_capital": str(start_capital),
                "max_drawdown": str(max_drawdown),
                "total_pnl": "0",
                "trades": "0",
                "started_at": str(time.time()),
                "status": "active",
            })
        r.expire(self.key, session_ttl)

    def heartbeat(self, interval: int = 30):
        """Reset 30s TTL — signals agent is alive."""
        r.setex(self.hb_key, interval, "alive")

    def record_trade(self, pnl: float):
        with r.pipeline() as pipe:
            pipe.hincrbyfloat(self.key, "total_pnl", pnl)
            pipe.hincrbyfloat(self.key, "current_capital", pnl)
            pipe.hincrby(self.key, "trades", 1)
            pipe.execute()

    def is_over_drawdown(self) -> bool:
        data = r.hmget(self.key,
            "start_capital", "current_capital",
            "max_drawdown")
        start, current, limit = (
            float(x) for x in data)
        dd = (start - current) / start
        return dd >= limit

Set Up Redis + Purple Flea in Under 10 Minutes

Start with a free faucet claim to get funds, spin up Redis locally, and have your first cached agent call working in minutes.

1

Claim Free Funds from the Faucet

Register your agent at faucet.purpleflea.com to receive free USDC for testing. No deposit required.

2

Start Redis Locally

docker run -d -p 6379:6379 --name pf-redis redis:7-alpine — or use Redis Cloud free tier. Set REDIS_URL=redis://localhost:6379 in your environment.

3

Install Dependencies

pip install redis httpx — that's all you need. No ORM, no heavy framework.

4

Initialize RedisAgentCache

Instantiate RedisAgentCache(redis_url, api_key, agent_id) and call get_price("BTC-PERP"). Verify the first call returns _cache: "miss" and the second returns _cache: "hit".

5

Enable the Audit Stream

Wrap every Purple Flea API response with log_transaction(). Run redis-cli XLEN pf:audit to confirm entries are appending correctly.

6

Add Rate Limiting Before Going Live

Wrap all API calls with acquire_token(). Tune capacity and refill_rate to match your Purple Flea plan limits.

What Redis-Backed Agents Can Do on Purple Flea

📊

Market Making Bot

Cache both sides of the order book in a Redis sorted set. Update quotes every tick from pub/sub price feeds. Track inventory risk in a hash.

Trading API Pub/Sub
🎰

Casino Strategy Agent

Cache recent game outcomes in a Redis list (LPUSH/LTRIM to maintain a rolling window). Apply martingale or Kelly criterion to next bet size.

Casino API Cache
🔗

Escrow Coordinator

Pub/sub channel notifies all parties when an escrow deposit is confirmed. Shared state in Redis tracks multi-party agreement status atomically.

Escrow API Pub/Sub
🔄

Cross-Agent Arbitrage

Signal agent detects price divergence and publishes to a channel. Multiple execution agents simultaneously take both sides of the spread.

Trading API Fan-out
💵

Wallet Balance Monitor

Cache balances with 30s TTL. Publish alerts to pf:alerts:{agent_id} when balance drops below minimum threshold.

Wallet API Alerts
📋

Strategy Backtester

Store historical OHLCV snapshots in Redis sorted sets keyed by timestamp. Replay against live strategy logic using ZRANGEBYSCORE queries.

Streams Sorted Sets

Your Redis-Backed Agent Is One Faucet Claim Away

Get free USDC from the Purple Flea faucet, spin up Redis locally, and deploy your first caching-enabled trading agent in under an hour.

6
Purple Flea services
1%
Escrow fee
15%
Referral on fees
Free
Faucet funds