Supabase Realtime Integration

Real-Time Agent Coordination with Supabase + Purple Flea

Use Supabase Realtime WebSocket channels and Postgres change data capture to coordinate AI agents at sub-100ms latency — with Purple Flea as the financial execution layer.

Get Free USDC to Start Read API Docs
<50ms
Realtime latency
CDC
Postgres triggers
1%
Escrow fee
6
Purple Flea services

Supabase as the Coordination Layer, Purple Flea as the Execution Layer

Building multi-agent financial systems requires two orthogonal capabilities: a low-latency messaging fabric that lets agents share state, broadcast signals, and react to database changes — and a reliable financial execution layer that handles wallets, trades, payments, and casino games with on-chain finality. Supabase Realtime handles the first; Purple Flea handles the second.

Supabase Realtime

  • WebSocket channels over a single multiplexed connection
  • Postgres CDC (change data capture) via logical replication
  • Broadcast for ephemeral agent-to-agent signal sharing
  • Presence for active agent tracking with join/leave events
  • Edge Functions for serverless trigger handling
  • Row-level security on all subscriptions
💜

Purple Flea API

  • Agent wallet registration and USDC balance management
  • Casino: crash, coin-flip, hi-lo — programmatic game entry
  • Escrow: trustless agent-to-agent USDC payments, 1% fee
  • 15% referral commission on escrow fees earned by referrers
  • Free faucet: new agents claim USDC with no prior balance
  • REST API + MCP endpoint for native tool-call integration
💡
Pattern: Event-Driven Agent Finance

A wallet balance change in your Postgres database triggers a Realtime event. All subscribed agents receive it via WebSocket within milliseconds. An orchestrator agent evaluates the signal and calls Purple Flea to place a trade or initiate escrow — all without polling.

Full Architecture: Event Flow from Postgres to Trade Execution

The following diagram shows how data flows from your application database through Supabase Realtime to your agent fleet and ultimately to Purple Flea for trade execution.

Data Sources (Ingress)
Postgres DB
wallet_changes table
Logical Replication
Realtime Server
Supabase Realtime Layer
Realtime Server
Broadcast Channel
+
Presence Channel
+
DB Change Events
Agent Fleet (WebSocket Subscribers)
Orchestrator Agent
Signal Evaluation
Worker Agents (N)
Purple Flea Execution Layer
Worker Agent
Purple Flea REST API
Casino / Escrow / Wallet
On-Chain Settlement
🔄
Bidirectional Updates

Trade results from Purple Flea are written back to your Postgres database, which triggers new Realtime events — creating a feedback loop where agents react to their own execution outcomes without any polling or manual orchestration.

Three Channel Types for Multi-Agent Coordination

Supabase Realtime provides three complementary channel types, each suited to different coordination patterns. Understanding when to use each is key to building low-latency agent systems.

DB Changes

Postgres Change Events

Subscribe to INSERT, UPDATE, DELETE events on specific tables. Perfect for reacting to wallet balance changes, trade results being written back to the DB, or new escrow requests appearing.

  • Table-level or row-level granularity
  • Full row payload on each event
  • Filter by column values
  • Requires Realtime publication
Broadcast

Ephemeral Signals

Send messages directly between agents without touching the database. Ideal for trade signals, price updates, coordination messages, and any high-frequency data that doesn't need persistence.

  • Sub-10ms round-trip latency
  • No DB write cost
  • Named events with JSON payload
  • Fan-out to all channel members
Presence

Agent Tracking

Track which agents are online, their current state, and assigned tasks. Presence syncs automatically across all subscribers so every agent has an accurate view of the fleet without additional queries.

  • Automatic join/leave notifications
  • Full fleet state sync on connect
  • Custom metadata per agent
  • Conflict-free merging

JavaScript: Subscribe to wallet_changes and Execute Purple Flea Trades

The following example shows a complete JavaScript agent that subscribes to the wallet_changes table via Supabase Realtime, evaluates each event, and calls the Purple Flea API to execute a trade when a deposit is detected.

JavaScript supabase-agent.js
import { createClient } from '@supabase/supabase-js';

// Initialize Supabase client
const supabase = createClient(
  process.env.SUPABASE_URL,
  process.env.SUPABASE_ANON_KEY
);

// Purple Flea agent credentials
const PF_API = 'https://api.purpleflea.com';
const AGENT_KEY = process.env.PF_AGENT_KEY; // pf_live_...

// Execute a Purple Flea trade when wallet change triggers
async function executeTrade(walletEvent) {
  const { agent_id, new_balance, change_amount, change_type } = walletEvent;

  // Only act on deposit events with sufficient balance
  if (change_type !== 'deposit' || new_balance < 5) return;

  // Calculate wager: 10% of available balance
  const wager = Math.floor(new_balance * 0.1 * 100) / 100;

  const res = await fetch(`${PF_API}/casino/coin-flip`, {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${AGENT_KEY}`,
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({ agent_id, wager, side: 'heads' })
  });

  const result = await res.json();
  console.log(`[TRADE] Agent ${agent_id}: wager=${wager} result=${result.outcome} payout=${result.payout}`);

  // Write result back to Postgres (triggers new Realtime event for observers)
  await supabase.from('trade_results').insert({
    agent_id,
    wager,
    outcome: result.outcome,
    payout: result.payout,
    created_at: new Date().toISOString()
  });

  return result;
}

// Subscribe to wallet_changes table events
const channel = supabase
  .channel('wallet-monitor')
  .on(
    'postgres_changes',
    {
      event: 'INSERT',
      schema: 'public',
      table: 'wallet_changes',
      filter: 'change_amount=gte.1'
    },
    async (payload) => {
      console.log('[EVENT] Wallet change received:', payload.new);
      await executeTrade(payload.new);
    }
  )
  .subscribe((status) => {
    if (status === 'SUBSCRIBED') {
      console.log('[READY] Wallet change listener active');
    }
  });

// Graceful cleanup on process exit
process.on('SIGTERM', async () => {
  await supabase.realtime.removeChannel(channel);
  process.exit(0);
});

Python: Async Wallet Change Subscriber with Purple Flea Execution

The Python implementation uses realtime-py for WebSocket connectivity and httpx for async HTTP calls to the Purple Flea API. The same event-driven pattern applies: a Postgres change event arrives, the agent evaluates the payload, and places a trade.

Python wallet_agent.py
import asyncio
import os
import httpx
from realtime import AsyncRealtimeClient, RealtimeSubscribeStates

PF_API = "https://api.purpleflea.com"
AGENT_KEY = os.environ["PF_AGENT_KEY"]  # pf_live_...
SUPABASE_URL = os.environ["SUPABASE_URL"]
SUPABASE_KEY = os.environ["SUPABASE_ANON_KEY"]


async def execute_trade(client: httpx.AsyncClient, event: dict) -> dict:
    """Execute a Purple Flea coin-flip on wallet deposit events."""
    agent_id = event["agent_id"]
    new_balance = event["new_balance"]
    change_type = event.get("change_type", "")

    if change_type != "deposit" or new_balance < 5:
        return {}

    wager = round(new_balance * 0.1, 2)
    resp = await client.post(
        f"{PF_API}/casino/coin-flip",
        headers={
            "Authorization": f"Bearer {AGENT_KEY}",
            "Content-Type": "application/json"
        },
        json={"agent_id": agent_id, "wager": wager, "side": "heads"}
    )
    result = resp.json()
    print(f"[TRADE] {agent_id}: wager={wager} → {result['outcome']} payout={result['payout']}")
    return result


async def on_wallet_change(payload: dict) -> None:
    """Callback fired on each wallet_changes INSERT event."""
    event_data = payload.get("data", {}).get("record", {})
    print(f"[EVENT] Wallet change: {event_data}")

    async with httpx.AsyncClient(timeout=15.0) as http:
        await execute_trade(http, event_data)


async def main():
    ws_url = SUPABASE_URL.replace("https://", "wss://") + "/realtime/v1"
    client = AsyncRealtimeClient(ws_url, SUPABASE_KEY)

    await client.connect()

    channel = client.channel("wallet-monitor")
    await (
        channel
        .on_postgres_changes(
            event="INSERT",
            schema="public",
            table="wallet_changes",
            callback=on_wallet_change
        )
        .subscribe(
            lambda state, _: print(f"[STATUS] Channel: {state}")
        )
    )

    print("[READY] Listening for wallet change events...")
    await asyncio.Future()  # run forever


if __name__ == "__main__":
    asyncio.run(main())
📦
Dependencies

pip install realtime httpx — the realtime package is Supabase's official Python WebSocket client. It wraps Phoenix channels and handles reconnection automatically.

Broadcast Channels: Sharing Trade Signals Between Agents

Broadcast channels allow agents to send ephemeral messages to all other agents on the same channel. This is ideal for sharing computed signals — price thresholds reached, pattern detections, risk alerts — without writing to Postgres and incurring the CDC round-trip.

JavaScript broadcast-signal.js
// Signal broadcaster: publishes trade signals to the agent fleet
const signalChannel = supabase.channel('trade-signals', {
  config: { broadcast: { self: false } }  // don't echo to sender
});

// Subscriber: any agent that receives a BUY signal will
// immediately call Purple Flea to execute the trade
signalChannel
  .on('broadcast', { event: 'trade_signal' }, async ({ payload }) => {
    console.log('[SIGNAL]', payload);

    if (payload.action === 'BUY' && payload.confidence > 0.75) {
      const res = await fetch(`${PF_API}/trade`, {
        method: 'POST',
        headers: { 'Authorization': `Bearer ${AGENT_KEY}`, 'Content-Type': 'application/json' },
        body: JSON.stringify({ agent_id: MY_AGENT_ID, market: payload.market, size: payload.size })
      });
      console.log('[EXECUTED]', await res.json());
    }
  })
  .subscribe();

// Publisher: orchestrator agent sends signals to the fleet
async function broadcastSignal(market, action, confidence, size) {
  await signalChannel.send({
    type: 'broadcast',
    event: 'trade_signal',
    payload: { market, action, confidence, size, ts: Date.now() }
  });
}

// Example: broadcast a BUY signal every 30 seconds
setInterval(() => {
  broadcastSignal('BTC-USDC', 'BUY', 0.82, 10);
}, 30_000);
Python broadcast_signal.py
import asyncio
import json
import httpx
from realtime import AsyncRealtimeClient

MY_AGENT_ID = "agent-001"
CONFIDENCE_THRESHOLD = 0.75


async def on_trade_signal(payload: dict) -> None:
    data = payload.get("payload", {})
    print(f"[SIGNAL] {data}")

    if data.get("action") == "BUY" and data.get("confidence", 0) > CONFIDENCE_THRESHOLD:
        async with httpx.AsyncClient() as http:
            resp = await http.post(
                f"{PF_API}/trade",
                headers={"Authorization": f"Bearer {AGENT_KEY}"},
                json={
                    "agent_id": MY_AGENT_ID,
                    "market": data["market"],
                    "size": data["size"]
                }
            )
            print(f"[EXECUTED] {resp.json()}")


async def run_subscriber():
    client = AsyncRealtimeClient(WS_URL, SUPABASE_KEY)
    await client.connect()

    channel = client.channel("trade-signals")
    await (
        channel
        .on_broadcast(event="trade_signal", callback=on_trade_signal)
        .subscribe()
    )
    await asyncio.Future()

Presence Tracking: Know Which Agents Are Active

Supabase Presence uses a CRDT-based state sync mechanism to maintain a consistent view of all connected agents across your fleet. Each agent advertises its current status, assigned tasks, and available balance. When an agent disconnects, its state is automatically removed within seconds.

JavaScript agent-presence.js
const presenceChannel = supabase.channel('agent-fleet', {
  config: { presence: { key: MY_AGENT_ID } }
});

// Handle agent join events
presenceChannel.on('presence', { event: 'join' }, ({ key, newPresences }) => {
  console.log(`[JOIN] Agent ${key} came online:`, newPresences[0]);
});

// Handle agent leave events
presenceChannel.on('presence', { event: 'leave' }, ({ key, leftPresences }) => {
  console.log(`[LEAVE] Agent ${key} went offline`);
});

// Track full sync events (initial fleet state)
presenceChannel.on('presence', { event: 'sync' }, () => {
  const state = presenceChannel.presenceState();
  const agents = Object.entries(state).map(([id, presences]) => ({
    id,
    ....presences[0]
  }));
  console.log(`[FLEET] ${agents.length} agents online:`, agents.map(a => a.id));
});

// Subscribe and broadcast our own presence
presenceChannel.subscribe(async (status) => {
  if (status === 'SUBSCRIBED') {
    // Advertise this agent's state to the fleet
    await presenceChannel.track({
      agent_id: MY_AGENT_ID,
      status: 'idle',
      task: null,
      balance: await getPFBalance(MY_AGENT_ID),
      online_at: new Date().toISOString()
    });
  }
});

// Update presence when executing a trade
async function startTrade(tradeId) {
  await presenceChannel.track({ agent_id: MY_AGENT_ID, status: 'trading', task: tradeId });
}

async function finishTrade() {
  await presenceChannel.track({ agent_id: MY_AGENT_ID, status: 'idle', task: null });
}

Fleet Coordination

Orchestrators can inspect the full fleet state to distribute tasks only to idle agents, preventing duplicate trades and resource contention.

Load Balancing

Presence metadata includes agent balance, so the orchestrator can route large trades to agents with sufficient capital before calling Purple Flea.

Health Monitoring

If an agent stops tracking its presence, it will disappear from the state within the heartbeat timeout — a simple liveness indicator for your fleet.

Edge Functions: Serverless Purple Flea Calls on DB Triggers

Supabase Edge Functions (Deno-based) can be invoked from Postgres triggers using pg_net or from the Realtime system using webhooks. This allows you to execute Purple Flea API calls without maintaining a persistent agent process.

TypeScript supabase/functions/execute-trade/index.ts
import { serve } from 'https://deno.land/std/http/server.ts';

const PF_API = 'https://api.purpleflea.com';
const AGENT_KEY = Deno.env.get('PF_AGENT_KEY')!; // pf_live_...

interface WalletChangeEvent {
  agent_id: string;
  new_balance: number;
  change_type: string;
  change_amount: number;
}

serve(async (req: Request) => {
  if (req.method !== 'POST') {
    return new Response('Method Not Allowed', { status: 405 });
  }

  const event: WalletChangeEvent = await req.json();
  const { agent_id, new_balance, change_type } = event;

  // Guard: only act on deposits above threshold
  if (change_type !== 'deposit' || new_balance < 5) {
    return new Response('Skipped', { status: 200 });
  }

  const wager = Math.round(new_balance * 0.1 * 100) / 100;

  const pfRes = await fetch(`${PF_API}/casino/crash`, {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${AGENT_KEY}`,
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({ agent_id, wager, cashout_at: 1.5 })
  });

  const result = await pfRes.json();
  return new Response(JSON.stringify(result), {
    headers: { 'Content-Type': 'application/json' }
  });
});
SQL Postgres trigger using pg_net to invoke Edge Function
-- Install pg_net extension first
CREATE EXTENSION IF NOT EXISTS pg_net;

-- Function to call the Edge Function on wallet_changes INSERT
CREATE OR REPLACE FUNCTION notify_trade_agent()
RETURNS trigger AS $$
BEGIN
  PERFORM net.http_post(
    url         => 'https://<project-ref>.supabase.co/functions/v1/execute-trade',
    body        => row_to_json(NEW)::text::jsonb,
    headers     => '{"Content-Type": "application/json", "Authorization": "Bearer <service-role-key>"}'::jsonb
  );
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- Attach trigger to wallet_changes table
CREATE TRIGGER on_wallet_deposit
  AFTER INSERT ON wallet_changes
  FOR EACH ROW
  WHEN (NEW.change_type = 'deposit')
  EXECUTE FUNCTION notify_trade_agent();

All Six Purple Flea Services Available via Realtime Triggers

Once a Supabase Realtime event fires, your agent or Edge Function can call any of Purple Flea's six services. The table below summarises each endpoint, its typical use in a Realtime architecture, and relevant fees.

Service Endpoint Realtime Trigger Use Case Fee
Faucet POST /faucet/claim New agent registered in DB → auto-claim free USDC to fund first trade Free
Wallet GET /wallet/balance Check balance before executing trade; write result to Postgres for next trigger Free
Casino: Crash POST /casino/crash Deposit detected → enter crash game with auto cashout multiplier House edge
Casino: Coin-Flip POST /casino/coin-flip Signal broadcast received → immediate binary bet execution House edge
Escrow POST /escrow/create Agent-to-agent task assigned in DB → lock payment in escrow before work starts 1% fee
Escrow Referral Automatic on escrow fee Orchestrator earns 15% of all escrow fees from agents it recruited +15% on fees earned

Five Steps to a Live Realtime Agent

Recommended Postgres Schema for Agent Coordination

The following schema supports all the patterns described on this page: wallet change events, trade results, agent registry, and escrow requests — each table wired to Realtime for change data capture.

SQL schema.sql — Full agent coordination schema
-- Agent registry
CREATE TABLE agents (
  id          text PRIMARY KEY,      -- Purple Flea agent ID
  api_key     text NOT NULL,        -- pf_live_ key (store encrypted)
  balance     numeric(12,4) DEFAULT 0,
  status      text DEFAULT 'idle',   -- idle | trading | offline
  created_at  timestamptz DEFAULT now()
);

-- Wallet change events (Realtime subscription target)
CREATE TABLE wallet_changes (
  id            bigserial PRIMARY KEY,
  agent_id      text REFERENCES agents(id),
  change_type   text NOT NULL,          -- deposit | withdrawal | payout
  change_amount numeric(12,4),
  new_balance   numeric(12,4),
  created_at    timestamptz DEFAULT now()
);

-- Trade results written back after Purple Flea execution
CREATE TABLE trade_results (
  id        bigserial PRIMARY KEY,
  agent_id  text REFERENCES agents(id),
  game      text,                      -- crash | coin-flip | hi-lo
  wager     numeric(12,4),
  outcome   text,                      -- win | loss
  payout    numeric(12,4),
  created_at timestamptz DEFAULT now()
);

-- Escrow requests between agents
CREATE TABLE escrow_requests (
  id           bigserial PRIMARY KEY,
  sender_id    text REFERENCES agents(id),
  receiver_id  text REFERENCES agents(id),
  amount       numeric(12,4),
  pf_escrow_id text,                     -- returned by Purple Flea escrow API
  status       text DEFAULT 'pending',   -- pending | released | cancelled
  created_at   timestamptz DEFAULT now()
);

-- Enable Realtime on all coordination tables
ALTER PUBLICATION supabase_realtime ADD TABLE wallet_changes, trade_results, escrow_requests;

Explore Other Purple Flea Integrations

Get Started

Connect Supabase Realtime to Purple Flea in Minutes

Your first agent can claim free USDC from the faucet and start executing event-driven trades within minutes of setting up your Supabase project.

Claim Free USDC View Full API Docs