Connect your Pipecat voice AI pipeline to Purple Flea's full financial stack. Let agents trade, check balances, escrow payments, and narrate portfolios through natural conversation — in real time.
About the integration
Pipecat (by Daily) is the leading open-source framework for building voice AI agents with real-time audio pipelines, STT/TTS, and LLM tool calling. Purple Flea provides the financial layer: trading, wallets, casino, domains, faucet, and escrow — all behind a single REST API and an MCP server.
Together, a Pipecat agent can listen to a user say "Buy 0.1 ETH at market" and immediately execute the trade, narrate the current portfolio P&L in the agent's voice, or trigger an escrow payment to another agent — all without breaking the conversation flow.
Architecture
Pipecat's tool-calling pipeline drops straight into Purple Flea's REST API or MCP server. No extra middleware required — define tool functions in Python and Pipecat routes LLM decisions to them automatically.
Daily WebRTC captures microphone audio. Pipecat's STT service (Deepgram, Whisper, etc.) streams text tokens to the LLM in real time.
The LLM (GPT-4o, Claude, Gemini) processes the transcript and emits a structured tool call — e.g. place_trade(symbol="ETH-PERP", side="buy", size=0.1).
Your registered tool function fires an authenticated HTTP request to https://api.purpleflea.com and returns the JSON result to the LLM context.
The LLM composes a human-friendly confirmation ("Trade filled at $3,412.50, your position is now 0.1 ETH long"). TTS converts it to audio and streams back to the user.
The pipeline stays live. The user can ask follow-up questions, close positions, check balances, or trigger escrow payments — all in the same voice session.
Services available
Every Purple Flea service exposes a clean REST endpoint you can wrap as a Pipecat tool function.
Coin flip, crash, and provably fair games. Voice command: "Flip a coin for 5 USDC" — result read back instantly.
275 perpetual futures markets. Place, close, and monitor positions through natural language commands.
Multi-chain balances and transfers. "What's my ETH balance?" or "Send 10 USDC to agent-abc."
Register and resolve agent-owned domains. Voice agent can register a domain for another service on the fly.
Free funds for new agents. A voice agent can self-fund in seconds: "Claim my faucet allocation."
Trustless agent-to-agent payments. 1% fee, 15% referral. Create and release escrows by voice command.
Quick start
A minimal Python script that wires up a Pipecat real-time voice pipeline with Purple Flea tool functions.
Install with: pip install pipecat-ai purpleflea-sdk
# pip install pipecat-ai requests import asyncio import requests from pipecat.pipeline.pipeline import Pipeline from pipecat.pipeline.runner import PipelineRunner from pipecat.pipeline.task import PipelineTask from pipecat.services.openai import OpenAILLMService from pipecat.services.deepgram import DeepgramSTTService, DeepgramTTSService from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.transports.services.daily import DailyTransport, DailyParams # ── Purple Flea config ──────────────────────────────────────────── PF_API = "https://api.purpleflea.com" PF_KEY = "pf_live_YOUR_AGENT_API_KEY" HEADERS = {"Authorization": f"Bearer {PF_KEY}", "Content-Type": "application/json"} # ── Purple Flea tool functions ──────────────────────────────────── def get_wallet_balance(chain: str = "ethereum") -> dict: """Return agent wallet balance for the given chain.""" r = requests.get(f"{PF_API}/wallet/balance", params={"chain": chain}, headers=HEADERS) r.raise_for_status() return r.json() def place_trade(symbol: str, side: str, size: float, order_type: str = "market") -> dict: """Place a trade on Purple Flea perpetual futures.""" payload = { "symbol": symbol, # e.g. "ETH-PERP" "side": side, # "buy" | "sell" "size": size, # base asset units "order_type": order_type, # "market" | "limit" } r = requests.post(f"{PF_API}/trading/orders", json=payload, headers=HEADERS) r.raise_for_status() return r.json() def get_open_positions() -> dict: """Fetch all open perpetual positions.""" r = requests.get(f"{PF_API}/trading/positions", headers=HEADERS) r.raise_for_status() return r.json() def create_escrow(recipient_agent_id: str, amount_usdc: float, release_after_days: int = 7) -> dict: """Create a trustless escrow payment to another agent.""" payload = { "recipient": recipient_agent_id, "amount": amount_usdc, "currency": "USDC", "release_days": release_after_days, } r = requests.post(f"https://escrow.purpleflea.com/api/create", json=payload, headers=HEADERS) r.raise_for_status() return r.json() def claim_faucet() -> dict: """Claim free test funds from the Purple Flea faucet.""" r = requests.post(f"https://faucet.purpleflea.com/api/claim", headers=HEADERS) r.raise_for_status() return r.json() # ── Tool schema for the LLM ─────────────────────────────────────── TOOLS = [ { "type": "function", "function": { "name": "get_wallet_balance", "description": "Get the agent's wallet balance on a given chain.", "parameters": { "type": "object", "properties": {"chain": {"type": "string", "default": "ethereum"}}, "required": [], }, }, }, { "type": "function", "function": { "name": "place_trade", "description": "Place a perpetual futures trade on Purple Flea.", "parameters": { "type": "object", "properties": { "symbol": {"type": "string", "description": "Market symbol e.g. ETH-PERP"}, "side": {"type": "string", "enum": ["buy", "sell"]}, "size": {"type": "number", "description": "Position size in base asset"}, "order_type": {"type": "string", "enum": ["market", "limit"], "default": "market"}, }, "required": ["symbol", "side", "size"], }, }, }, { "type": "function", "function": { "name": "get_open_positions", "description": "Retrieve all open perpetual futures positions for the agent.", "parameters": {"type": "object", "properties": {}, "required": []}, }, }, { "type": "function", "function": { "name": "create_escrow", "description": "Create a trustless escrow payment to another agent via Purple Flea.", "parameters": { "type": "object", "properties": { "recipient_agent_id": {"type": "string"}, "amount_usdc": {"type": "number"}, "release_after_days": {"type": "integer", "default": 7}, }, "required": ["recipient_agent_id", "amount_usdc"], }, }, }, { "type": "function", "function": { "name": "claim_faucet", "description": "Claim free test funds from the Purple Flea faucet (new agents only).", "parameters": {"type": "object", "properties": {}, "required": []}, }, }, ] TOOL_MAP = { "get_wallet_balance": get_wallet_balance, "place_trade": place_trade, "get_open_positions": get_open_positions, "create_escrow": create_escrow, "claim_faucet": claim_faucet, } SYSTEM_PROMPT = """You are a voice-enabled financial AI agent powered by Purple Flea. You can trade 275 perpetual markets, check multi-chain wallet balances, create escrow payments, and claim faucet funds — all by voice command. Be concise: always confirm actions with a single sentence and read back key numbers (fill price, balance, escrow ID) clearly.""" # ── Pipecat pipeline ───────────────────────────────────────────── async def run_voice_agent(daily_room_url: str, daily_token: str): transport = DailyTransport( daily_room_url, daily_token, "PurpleFleaAgent", DailyParams(audio_in_enabled=True, audio_out_enabled=True), ) stt = DeepgramSTTService(api_key="YOUR_DEEPGRAM_KEY") tts = DeepgramTTSService(api_key="YOUR_DEEPGRAM_KEY", voice="aura-asteria-en") llm = OpenAILLMService(api_key="YOUR_OPENAI_KEY", model="gpt-4o") context = OpenAILLMContext( messages=[{"role": "system", "content": SYSTEM_PROMPT}], tools=TOOLS, ) context_aggregator = llm.create_context_aggregator(context) pipeline = Pipeline([ transport.input(), stt, context_aggregator.user(), llm, tts, transport.output(), context_aggregator.assistant(), ]) task = PipelineTask(pipeline) # Route LLM tool calls to Purple Flea functions async def on_tool_call(name, args, tool_call_id): fn = TOOL_MAP.get(name) if not fn: result = {"error": f"Unknown tool: {name}"} else: try: result = fn(**args) except Exception as e: result = {"error": str(e)} await task.queue_frame( context_aggregator.tool_result(tool_call_id, str(result)) ) llm.register_function(None, on_tool_call) runner = PipelineRunner() await runner.run(task) if __name__ == "__main__": # Replace with your Daily room URL and token asyncio.run(run_voice_agent( "https://yourapp.daily.co/purple-flea-room", "YOUR_DAILY_MEETING_TOKEN", ))
MCP integration
Purple Flea exposes an MCP (Model Context Protocol) server at
https://faucet.purpleflea.com/mcp and
https://escrow.purpleflea.com/mcp.
Any LLM inside a Pipecat pipeline that supports MCP tool calls can connect directly — no extra wrapper code needed.
/mcp endpoint and the server advertises all available tools automatically.
from pipecat.services.openai import OpenAILLMService from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext # Pipecat MCP client (example pattern — adapt to your Pipecat version) MCP_ENDPOINTS = { "faucet": "https://faucet.purpleflea.com/mcp", "escrow": "https://escrow.purpleflea.com/mcp", } # The LLM discovers available tools from the MCP manifest automatically. # Tool calls are routed back to the MCP server via HTTP POST. # No manual schema registration required. context = OpenAILLMContext( messages=[{"role": "system", "content": "You are a financial AI agent."}], # tools are populated automatically from MCP manifest ) # Example: Claude as the LLM backend via MCP from anthropic import Anthropic client = Anthropic() response = client.messages.create( model="claude-opus-4-6", max_tokens=1024, system="You are a financial voice agent. Use MCP tools to interact with Purple Flea.", messages=[{"role": "user", "content": "What is my current ETH balance?"}], # MCP server config injected at framework level )
Voice command examples
These are real utterances the Pipecat pipeline understands and routes to Purple Flea automatically. The LLM handles intent extraction — no regex, no keyword matching.
| Voice utterance | Purple Flea action | Service |
|---|---|---|
| "Buy 0.1 ETH at market" | POST /trading/orders — ETH-PERP buy market 0.1 | Trading |
| "What's my Ethereum balance?" | GET /wallet/balance?chain=ethereum | Wallet |
| "Show me all my open positions" | GET /trading/positions | Trading |
| "Close my BTC short" | POST /trading/orders — BTC-PERP close | Trading |
| "Send 50 USDC to agent-xyz with 3-day escrow" | POST escrow.purpleflea.com/api/create | Escrow |
| "Claim my faucet funds" | POST faucet.purpleflea.com/api/claim | Faucet |
| "Flip a coin for 5 USDC" | POST /casino/coinflip | Casino |
| "Register the domain mybot.agent" | POST /domains/register | Domains |
Advanced example
A complete pattern for building a scheduled portfolio narration agent that checks positions every 60 seconds and reads a P&L summary aloud via a Pipecat TTS pipeline.
import asyncio, requests, json from openai import AsyncOpenAI PF_API = "https://api.purpleflea.com" PF_KEY = "pf_live_YOUR_AGENT_API_KEY" H = {"Authorization": f"Bearer {PF_KEY}"} openai = AsyncOpenAI() def fetch_portfolio_summary() -> str: """Pull positions + balances and build a text summary.""" positions = requests.get(f"{PF_API}/trading/positions", headers=H).json() balances = requests.get(f"{PF_API}/wallet/balance", headers=H).json() lines = ["Portfolio snapshot:"] total_pnl = 0.0 for pos in positions.get("positions", []): pnl = pos.get("unrealised_pnl", 0) total_pnl += pnl direction = "long" if pos["side"] == "buy" else "short" lines.append( f"{pos['symbol']} {direction} {pos['size']} — P&L: {pnl:+.2f} USDC" ) lines.append(f"Total unrealised P&L: {total_pnl:+.2f} USDC") wallet_usdc = balances.get("usdc", {}).get("available", 0) lines.append(f"Available USDC: {wallet_usdc:.2f}") return "\n".join(lines) async def narrate_once(): summary = await asyncio.to_thread(fetch_portfolio_summary) # Ask the LLM to turn the raw summary into natural speech chat = await openai.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": "Convert the following portfolio data into a clear, spoken summary. " "Be concise — no more than 3 sentences. Highlight the biggest mover."}, {"role": "user", "content": summary}, ], max_tokens=120, ) spoken = chat.choices[0].message.content # In a real Pipecat pipeline: push spoken text to TTS frame # Here we just print for demonstration print("[NARRATION]", spoken) return spoken async def narration_loop(interval_seconds: int = 60): """Run portfolio narration every interval_seconds.""" while True: try: await narrate_once() except Exception as e: print(f"[ERROR] {e}") await asyncio.sleep(interval_seconds) if __name__ == "__main__": asyncio.run(narration_loop())
Balance monitoring
Monitor your agent's USDC balance every 30 seconds. When it drops below a threshold, trigger a spoken alert and optionally auto-claim from the Purple Flea faucet.
import asyncio, requests PF_API = "https://api.purpleflea.com" FAUCET_URL = "https://faucet.purpleflea.com/api/claim" PF_KEY = "pf_live_YOUR_AGENT_API_KEY" H = {"Authorization": f"Bearer {PF_KEY}"} THRESHOLD = 10.0 # USDC — trigger alert below this AUTO_CLAIM = True # auto-request faucet funds def get_usdc_balance() -> float: r = requests.get(f"{PF_API}/wallet/balance?chain=ethereum", headers=H) return r.json().get("usdc", {}).get("available", 0.0) def speak(text: str): # In a real Pipecat pipeline: queue a TTSFrame with this text print(f"[ALERT] {text}") async def balance_monitor(): already_alerted = False while True: try: balance = await asyncio.to_thread(get_usdc_balance) print(f"[MONITOR] USDC balance: {balance:.2f}") if balance < THRESHOLD and not already_alerted: speak(f"Warning: your USDC balance has dropped to {balance:.2f}. " "This is below your alert threshold.") already_alerted = True if AUTO_CLAIM: r = await asyncio.to_thread( lambda: requests.post(FAUCET_URL, headers=H) ) if r.status_code == 200: data = r.json() speak(f"Faucet claimed: {data.get('amount', '?')} USDC added to your wallet.") else: speak("Faucet claim failed — you may have already claimed today.") elif balance >= THRESHOLD: already_alerted = False # reset so next dip triggers again except Exception as e: print(f"[ERROR] {e}") await asyncio.sleep(30) if __name__ == "__main__": asyncio.run(balance_monitor())
API reference
All endpoints accept JSON bodies and return JSON. Authenticate with a Bearer token from your agent registration.
| Endpoint | Method | Tool function | Description |
|---|---|---|---|
/wallet/balance |
GET | get_wallet_balance |
Multi-chain balances (ETH, BTC, SOL, TRX, XMR) |
/trading/orders |
POST | place_trade |
Open market or limit order across 275 perp markets |
/trading/positions |
GET | get_open_positions |
All open positions with mark price and P&L |
/casino/coinflip |
POST | play_coinflip |
Provably fair coin flip with wager amount |
/domains/register |
POST | register_domain |
Register an agent-owned domain |
faucet.../api/claim |
POST | claim_faucet |
One-time free funds for new agents |
escrow.../api/create |
POST | create_escrow |
Trustless payment with time-locked release |
escrow.../api/release |
POST | release_escrow |
Release funds to recipient (on condition met) |
Get started
Everything you need to put a live Pipecat + Purple Flea pipeline into production.
Go to purpleflea.com/register. It's free. You get an API key and a wallet address instantly. New agents can claim free test funds from the faucet.
pip install pipecat-ai requests openai
— plus your STT/TTS provider of choice (Deepgram, ElevenLabs, AssemblyAI).
Copy the tool functions and schemas from this page. Swap in your pf_live_* API key.
Register the tool functions with Pipecat's LLM context aggregator. The framework handles routing tool calls from the LLM to your functions automatically.
Join the Daily room, say "What's my balance?" — and hear the answer spoken back in under 2 seconds. Then try "Buy 0.01 ETH at market" and watch the trade execute live.
Other integrations
Not using Pipecat? Check the integration guide for your framework.
Native function calling with the Agents SDK. Full example with tool definitions and swarm handoffs.
Multi-agent crews with Purple Flea as the financial back-end. Roles: Trader, Analyst, Risk Manager.
LangChain tools and chains that call Purple Flea. Works with LCEL, agents, and LangGraph.
Phidata Agents with Purple Flea financial tools. Includes memory, storage, and team support.
AutoGen multi-agent conversations with Purple Flea tools for trading, wallets, and escrow.
Connect any MCP-compatible client to Purple Flea's faucet and escrow via Smithery's registry.