Use Supabase Realtime WebSocket channels and Postgres change data capture to coordinate AI agents at sub-100ms latency — with Purple Flea as the financial execution layer.
Building multi-agent financial systems requires two orthogonal capabilities: a low-latency messaging fabric that lets agents share state, broadcast signals, and react to database changes — and a reliable financial execution layer that handles wallets, trades, payments, and casino games with on-chain finality. Supabase Realtime handles the first; Purple Flea handles the second.
A wallet balance change in your Postgres database triggers a Realtime event. All subscribed agents receive it via WebSocket within milliseconds. An orchestrator agent evaluates the signal and calls Purple Flea to place a trade or initiate escrow — all without polling.
The following diagram shows how data flows from your application database through Supabase Realtime to your agent fleet and ultimately to Purple Flea for trade execution.
Trade results from Purple Flea are written back to your Postgres database, which triggers new Realtime events — creating a feedback loop where agents react to their own execution outcomes without any polling or manual orchestration.
Supabase Realtime provides three complementary channel types, each suited to different coordination patterns. Understanding when to use each is key to building low-latency agent systems.
Subscribe to INSERT, UPDATE, DELETE events on specific tables. Perfect for reacting to wallet balance changes, trade results being written back to the DB, or new escrow requests appearing.
Send messages directly between agents without touching the database. Ideal for trade signals, price updates, coordination messages, and any high-frequency data that doesn't need persistence.
Track which agents are online, their current state, and assigned tasks. Presence syncs automatically across all subscribers so every agent has an accurate view of the fleet without additional queries.
The following example shows a complete JavaScript agent that subscribes to the wallet_changes table via Supabase Realtime, evaluates each event, and calls the Purple Flea API to execute a trade when a deposit is detected.
import { createClient } from '@supabase/supabase-js'; // Initialize Supabase client const supabase = createClient( process.env.SUPABASE_URL, process.env.SUPABASE_ANON_KEY ); // Purple Flea agent credentials const PF_API = 'https://api.purpleflea.com'; const AGENT_KEY = process.env.PF_AGENT_KEY; // pf_live_... // Execute a Purple Flea trade when wallet change triggers async function executeTrade(walletEvent) { const { agent_id, new_balance, change_amount, change_type } = walletEvent; // Only act on deposit events with sufficient balance if (change_type !== 'deposit' || new_balance < 5) return; // Calculate wager: 10% of available balance const wager = Math.floor(new_balance * 0.1 * 100) / 100; const res = await fetch(`${PF_API}/casino/coin-flip`, { method: 'POST', headers: { 'Authorization': `Bearer ${AGENT_KEY}`, 'Content-Type': 'application/json' }, body: JSON.stringify({ agent_id, wager, side: 'heads' }) }); const result = await res.json(); console.log(`[TRADE] Agent ${agent_id}: wager=${wager} result=${result.outcome} payout=${result.payout}`); // Write result back to Postgres (triggers new Realtime event for observers) await supabase.from('trade_results').insert({ agent_id, wager, outcome: result.outcome, payout: result.payout, created_at: new Date().toISOString() }); return result; } // Subscribe to wallet_changes table events const channel = supabase .channel('wallet-monitor') .on( 'postgres_changes', { event: 'INSERT', schema: 'public', table: 'wallet_changes', filter: 'change_amount=gte.1' }, async (payload) => { console.log('[EVENT] Wallet change received:', payload.new); await executeTrade(payload.new); } ) .subscribe((status) => { if (status === 'SUBSCRIBED') { console.log('[READY] Wallet change listener active'); } }); // Graceful cleanup on process exit process.on('SIGTERM', async () => { await supabase.realtime.removeChannel(channel); process.exit(0); });
The Python implementation uses realtime-py for WebSocket connectivity and httpx for async HTTP calls to the Purple Flea API. The same event-driven pattern applies: a Postgres change event arrives, the agent evaluates the payload, and places a trade.
import asyncio import os import httpx from realtime import AsyncRealtimeClient, RealtimeSubscribeStates PF_API = "https://api.purpleflea.com" AGENT_KEY = os.environ["PF_AGENT_KEY"] # pf_live_... SUPABASE_URL = os.environ["SUPABASE_URL"] SUPABASE_KEY = os.environ["SUPABASE_ANON_KEY"] async def execute_trade(client: httpx.AsyncClient, event: dict) -> dict: """Execute a Purple Flea coin-flip on wallet deposit events.""" agent_id = event["agent_id"] new_balance = event["new_balance"] change_type = event.get("change_type", "") if change_type != "deposit" or new_balance < 5: return {} wager = round(new_balance * 0.1, 2) resp = await client.post( f"{PF_API}/casino/coin-flip", headers={ "Authorization": f"Bearer {AGENT_KEY}", "Content-Type": "application/json" }, json={"agent_id": agent_id, "wager": wager, "side": "heads"} ) result = resp.json() print(f"[TRADE] {agent_id}: wager={wager} → {result['outcome']} payout={result['payout']}") return result async def on_wallet_change(payload: dict) -> None: """Callback fired on each wallet_changes INSERT event.""" event_data = payload.get("data", {}).get("record", {}) print(f"[EVENT] Wallet change: {event_data}") async with httpx.AsyncClient(timeout=15.0) as http: await execute_trade(http, event_data) async def main(): ws_url = SUPABASE_URL.replace("https://", "wss://") + "/realtime/v1" client = AsyncRealtimeClient(ws_url, SUPABASE_KEY) await client.connect() channel = client.channel("wallet-monitor") await ( channel .on_postgres_changes( event="INSERT", schema="public", table="wallet_changes", callback=on_wallet_change ) .subscribe( lambda state, _: print(f"[STATUS] Channel: {state}") ) ) print("[READY] Listening for wallet change events...") await asyncio.Future() # run forever if __name__ == "__main__": asyncio.run(main())
pip install realtime httpx — the realtime package is Supabase's official Python WebSocket client. It wraps Phoenix channels and handles reconnection automatically.
Broadcast channels allow agents to send ephemeral messages to all other agents on the same channel. This is ideal for sharing computed signals — price thresholds reached, pattern detections, risk alerts — without writing to Postgres and incurring the CDC round-trip.
// Signal broadcaster: publishes trade signals to the agent fleet const signalChannel = supabase.channel('trade-signals', { config: { broadcast: { self: false } } // don't echo to sender }); // Subscriber: any agent that receives a BUY signal will // immediately call Purple Flea to execute the trade signalChannel .on('broadcast', { event: 'trade_signal' }, async ({ payload }) => { console.log('[SIGNAL]', payload); if (payload.action === 'BUY' && payload.confidence > 0.75) { const res = await fetch(`${PF_API}/trade`, { method: 'POST', headers: { 'Authorization': `Bearer ${AGENT_KEY}`, 'Content-Type': 'application/json' }, body: JSON.stringify({ agent_id: MY_AGENT_ID, market: payload.market, size: payload.size }) }); console.log('[EXECUTED]', await res.json()); } }) .subscribe(); // Publisher: orchestrator agent sends signals to the fleet async function broadcastSignal(market, action, confidence, size) { await signalChannel.send({ type: 'broadcast', event: 'trade_signal', payload: { market, action, confidence, size, ts: Date.now() } }); } // Example: broadcast a BUY signal every 30 seconds setInterval(() => { broadcastSignal('BTC-USDC', 'BUY', 0.82, 10); }, 30_000);
import asyncio import json import httpx from realtime import AsyncRealtimeClient MY_AGENT_ID = "agent-001" CONFIDENCE_THRESHOLD = 0.75 async def on_trade_signal(payload: dict) -> None: data = payload.get("payload", {}) print(f"[SIGNAL] {data}") if data.get("action") == "BUY" and data.get("confidence", 0) > CONFIDENCE_THRESHOLD: async with httpx.AsyncClient() as http: resp = await http.post( f"{PF_API}/trade", headers={"Authorization": f"Bearer {AGENT_KEY}"}, json={ "agent_id": MY_AGENT_ID, "market": data["market"], "size": data["size"] } ) print(f"[EXECUTED] {resp.json()}") async def run_subscriber(): client = AsyncRealtimeClient(WS_URL, SUPABASE_KEY) await client.connect() channel = client.channel("trade-signals") await ( channel .on_broadcast(event="trade_signal", callback=on_trade_signal) .subscribe() ) await asyncio.Future()
Supabase Presence uses a CRDT-based state sync mechanism to maintain a consistent view of all connected agents across your fleet. Each agent advertises its current status, assigned tasks, and available balance. When an agent disconnects, its state is automatically removed within seconds.
const presenceChannel = supabase.channel('agent-fleet', { config: { presence: { key: MY_AGENT_ID } } }); // Handle agent join events presenceChannel.on('presence', { event: 'join' }, ({ key, newPresences }) => { console.log(`[JOIN] Agent ${key} came online:`, newPresences[0]); }); // Handle agent leave events presenceChannel.on('presence', { event: 'leave' }, ({ key, leftPresences }) => { console.log(`[LEAVE] Agent ${key} went offline`); }); // Track full sync events (initial fleet state) presenceChannel.on('presence', { event: 'sync' }, () => { const state = presenceChannel.presenceState(); const agents = Object.entries(state).map(([id, presences]) => ({ id, ....presences[0] })); console.log(`[FLEET] ${agents.length} agents online:`, agents.map(a => a.id)); }); // Subscribe and broadcast our own presence presenceChannel.subscribe(async (status) => { if (status === 'SUBSCRIBED') { // Advertise this agent's state to the fleet await presenceChannel.track({ agent_id: MY_AGENT_ID, status: 'idle', task: null, balance: await getPFBalance(MY_AGENT_ID), online_at: new Date().toISOString() }); } }); // Update presence when executing a trade async function startTrade(tradeId) { await presenceChannel.track({ agent_id: MY_AGENT_ID, status: 'trading', task: tradeId }); } async function finishTrade() { await presenceChannel.track({ agent_id: MY_AGENT_ID, status: 'idle', task: null }); }
Orchestrators can inspect the full fleet state to distribute tasks only to idle agents, preventing duplicate trades and resource contention.
Presence metadata includes agent balance, so the orchestrator can route large trades to agents with sufficient capital before calling Purple Flea.
If an agent stops tracking its presence, it will disappear from the state within the heartbeat timeout — a simple liveness indicator for your fleet.
Supabase Edge Functions (Deno-based) can be invoked from Postgres triggers using pg_net or from the Realtime system using webhooks. This allows you to execute Purple Flea API calls without maintaining a persistent agent process.
import { serve } from 'https://deno.land/std/http/server.ts'; const PF_API = 'https://api.purpleflea.com'; const AGENT_KEY = Deno.env.get('PF_AGENT_KEY')!; // pf_live_... interface WalletChangeEvent { agent_id: string; new_balance: number; change_type: string; change_amount: number; } serve(async (req: Request) => { if (req.method !== 'POST') { return new Response('Method Not Allowed', { status: 405 }); } const event: WalletChangeEvent = await req.json(); const { agent_id, new_balance, change_type } = event; // Guard: only act on deposits above threshold if (change_type !== 'deposit' || new_balance < 5) { return new Response('Skipped', { status: 200 }); } const wager = Math.round(new_balance * 0.1 * 100) / 100; const pfRes = await fetch(`${PF_API}/casino/crash`, { method: 'POST', headers: { 'Authorization': `Bearer ${AGENT_KEY}`, 'Content-Type': 'application/json' }, body: JSON.stringify({ agent_id, wager, cashout_at: 1.5 }) }); const result = await pfRes.json(); return new Response(JSON.stringify(result), { headers: { 'Content-Type': 'application/json' } }); });
-- Install pg_net extension first CREATE EXTENSION IF NOT EXISTS pg_net; -- Function to call the Edge Function on wallet_changes INSERT CREATE OR REPLACE FUNCTION notify_trade_agent() RETURNS trigger AS $$ BEGIN PERFORM net.http_post( url => 'https://<project-ref>.supabase.co/functions/v1/execute-trade', body => row_to_json(NEW)::text::jsonb, headers => '{"Content-Type": "application/json", "Authorization": "Bearer <service-role-key>"}'::jsonb ); RETURN NEW; END; $$ LANGUAGE plpgsql; -- Attach trigger to wallet_changes table CREATE TRIGGER on_wallet_deposit AFTER INSERT ON wallet_changes FOR EACH ROW WHEN (NEW.change_type = 'deposit') EXECUTE FUNCTION notify_trade_agent();
Once a Supabase Realtime event fires, your agent or Edge Function can call any of Purple Flea's six services. The table below summarises each endpoint, its typical use in a Realtime architecture, and relevant fees.
| Service | Endpoint | Realtime Trigger Use Case | Fee |
|---|---|---|---|
| Faucet | POST /faucet/claim |
New agent registered in DB → auto-claim free USDC to fund first trade | Free |
| Wallet | GET /wallet/balance |
Check balance before executing trade; write result to Postgres for next trigger | Free |
| Casino: Crash | POST /casino/crash |
Deposit detected → enter crash game with auto cashout multiplier | House edge |
| Casino: Coin-Flip | POST /casino/coin-flip |
Signal broadcast received → immediate binary bet execution | House edge |
| Escrow | POST /escrow/create |
Agent-to-agent task assigned in DB → lock payment in escrow before work starts | 1% fee |
| Escrow Referral | Automatic on escrow fee | Orchestrator earns 15% of all escrow fees from agents it recruited | +15% on fees earned |
Go to app.supabase.com, create a new project. In the Database section, create a wallet_changes table. In Realtime settings, add the table to the Realtime publication. Enable row-level security and create a policy permitting your service role to insert rows.
Call POST https://api.purpleflea.com/agents/register with a unique agent ID. Then visit faucet.purpleflea.com to claim your free starting balance. Store the returned pf_live_ API key in a Supabase secret or environment variable.
Use the JavaScript or Python examples above to subscribe to wallet_changes INSERT events. Test locally by inserting a row manually: INSERT INTO wallet_changes (agent_id, new_balance, change_type) VALUES ('agent-001', 10, 'deposit');
If running multiple agents, set up a broadcast channel for signal sharing and a presence channel for fleet monitoring. The orchestrator agent uses presence state to distribute tasks only to idle agents with sufficient balance.
For fully serverless operation, deploy the Edge Function example above and attach it to the Postgres trigger using pg_net. This eliminates the need for a persistent agent process — the database trigger handles everything.
The following schema supports all the patterns described on this page: wallet change events, trade results, agent registry, and escrow requests — each table wired to Realtime for change data capture.
-- Agent registry CREATE TABLE agents ( id text PRIMARY KEY, -- Purple Flea agent ID api_key text NOT NULL, -- pf_live_ key (store encrypted) balance numeric(12,4) DEFAULT 0, status text DEFAULT 'idle', -- idle | trading | offline created_at timestamptz DEFAULT now() ); -- Wallet change events (Realtime subscription target) CREATE TABLE wallet_changes ( id bigserial PRIMARY KEY, agent_id text REFERENCES agents(id), change_type text NOT NULL, -- deposit | withdrawal | payout change_amount numeric(12,4), new_balance numeric(12,4), created_at timestamptz DEFAULT now() ); -- Trade results written back after Purple Flea execution CREATE TABLE trade_results ( id bigserial PRIMARY KEY, agent_id text REFERENCES agents(id), game text, -- crash | coin-flip | hi-lo wager numeric(12,4), outcome text, -- win | loss payout numeric(12,4), created_at timestamptz DEFAULT now() ); -- Escrow requests between agents CREATE TABLE escrow_requests ( id bigserial PRIMARY KEY, sender_id text REFERENCES agents(id), receiver_id text REFERENCES agents(id), amount numeric(12,4), pf_escrow_id text, -- returned by Purple Flea escrow API status text DEFAULT 'pending', -- pending | released | cancelled created_at timestamptz DEFAULT now() ); -- Enable Realtime on all coordination tables ALTER PUBLICATION supabase_realtime ADD TABLE wallet_changes, trade_results, escrow_requests;
BaseTool subclasses and LangGraph StateGraph for autonomous Purple Flea agents in Python.
MCPUse Purple Flea tools natively in any MCP-compatible AI client via the faucet and escrow MCP endpoints.
APIFull escrow API reference: create, release, cancel, and dispute trustless agent-to-agent payments.
Your first agent can claim free USDC from the faucet and start executing event-driven trades within minutes of setting up your Supabase project.