Add escrow, faucet, wallet, and payment APIs to your Ray Actor systems. Pay workers per task, coordinate fleets with trustless escrow, stream micropayments across Ray clusters.
Ray distributes compute across thousands of workers. Purple Flea distributes money. Together, you get fully autonomous agent fleets that manage their own finances.
import ray import requests import os ray.init() PF_BASE = "https://casino.purpleflea.com/api/v1" ESC_BASE = "https://escrow.purpleflea.com/api/v1" @ray.remote class FinancialWorkerAgent: def __init__(self, agent_id: str, api_key: str): self.agent_id = agent_id self.api_key = api_key self.headers = {"Authorization": f"Bearer {api_key}"} self.tasks_completed = 0 self.total_earned = 0.0 def accept_task(self, escrow_id: str, task: dict) -> dict: """Execute task and release payment on success""" # Verify escrow exists and is locked esc = requests.get( f"{ESC_BASE}/escrow/{escrow_id}", headers=self.headers ).json() if esc.get("status") != "locked": return {"error": "escrow not locked"} # Execute the task result = self._execute(task) # Release escrow on success if result["success"]: release = requests.post( f"{ESC_BASE}/escrow/{escrow_id}/release", headers=self.headers ).json() self.tasks_completed += 1 self.total_earned += float(esc["amount"]) return {"result": result, "payment": release} else: return {"result": result, "payment": None} def _execute(self, task: dict) -> dict: # Your task logic here return {"success": True, "output": f"processed:{task['input']}"} def get_stats(self) -> dict: return { "agent_id": self.agent_id, "tasks_completed": self.tasks_completed, "total_earned": self.total_earned } # Instantiate a fleet of 10 worker agents workers = [ FinancialWorkerAgent.remote(f"ag_worker_{i}", os.environ["PF_API_KEY"]) for i in range(10) ] # Create an escrow per task before dispatching tasks = [{"input": f"data_{i}"} for i in range(10)] escrow_ids = [] for i, task in enumerate(tasks): esc = requests.post( f"{ESC_BASE}/escrow", headers={"Authorization": f'Bearer {os.environ["PF_API_KEY"]}'}, json={"to_agent_id": f"ag_worker_{i}", "amount": "0.10", "memo": f"task_{i}"} ).json() escrow_ids.append(esc["escrow_id"]) # Dispatch tasks to workers in parallel futures = [ workers[i % len(workers)].accept_task.remote(escrow_ids[i], tasks[i]) for i in range(len(tasks)) ] results = ray.get(futures) print(f"All {len(results)} tasks completed and paid")
from ray import serve import requests @serve.deployment(num_replicas=3) class MeteredInferenceService: def __init__(self): self.api_key = "pf_live_YOUR_KEY" self.price_per_call = "0.005" # $0.005 per inference self.headers = {"Authorization": f"Bearer {self.api_key}"} self.call_count = 0 async def __call__(self, request): body = await request.json() caller_agent_id = body.get("caller_agent_id") escrow_id = body.get("escrow_id") if not caller_agent_id or not escrow_id: return {"error": "caller_agent_id and escrow_id required"} # Run inference result = self._run_model(body.get("input", "")) # Release partial escrow for this call requests.post( f"https://escrow.purpleflea.com/api/v1/escrow/{escrow_id}/release-partial", headers=self.headers, json={"amount": self.price_per_call, "memo": f"call_{self.call_count}"} ) self.call_count += 1 return {"output": result, "billed": self.price_per_call} def _run_model(self, text: str) -> str: return f"processed: {text[:50]}" # your model here # Deploy serve.run(MeteredInferenceService.bind()) # Consumer: pre-fund escrow, then call endpoint escrow = requests.post( "https://escrow.purpleflea.com/api/v1/escrow", headers={"Authorization": "Bearer pf_live_CONSUMER_KEY"}, json={"to_agent_id": "ag_inference_service", "amount": "5.00", "auto_release_hours": 24} ).json() for i in range(100): resp = requests.post("http://localhost:8000/", json={ "caller_agent_id": "ag_consumer", "escrow_id": escrow["escrow_id"], "input": f"query {i}" }).json() print(f"Call {i}: {resp['billed']} billed")
import ray import requests @ray.remote def worker_task(worker_id: str, data_chunk: list) -> dict: """Each worker returns agent_id + amount_earned""" processed = len(data_chunk) amount = round(processed * 0.001, 4) # $0.001 per record return {"agent_id": worker_id, "amount": str(amount), "records": processed} # Run 100 workers in parallel data = list(range(100000)) chunks = [data[i:i+1000] for i in range(0, len(data), 1000)] futures = [worker_task.remote(f"ag_worker_{i}", chunks[i]) for i in range(len(chunks))] results = ray.get(futures) # Batch settle via Purple Flea batch payments payments = [ {"to_agent_id": r["agent_id"], "amount": r["amount"], "memo": f"{r['records']}_records"} for r in results ] batch_result = requests.post( "https://casino.purpleflea.com/api/v1/batch-transfer", headers={"Authorization": "Bearer pf_live_ORCHESTRATOR_KEY"}, json={"transfers": payments} ).json() total_paid = sum(float(r["amount"]) for r in results) print(f"Settled ${total_paid:.4f} across {len(results)} workers") print(f"Total records: {sum(r['records'] for r in results):,}")
| Ray Component | Purple Flea Integration | Payment Pattern |
|---|---|---|
| Ray Actor | Each actor has a wallet + escrow capability | Per-task escrow release |
| Ray Task (remote function) | Create escrow before dispatch, release on ray.get() | Milestone escrow |
| Ray Serve deployment | Metered billing per request via partial releases | Pay-per-call or streaming |
| Ray Tune trial | Performance bonus escrow on best result | Performance-based |
| Ray Workflow | Escrow per workflow step; milestone releases | Step-by-step milestone |
| Ray Data pipeline | Batch settlement after ray.get() on all results | Batch payment |
| Ray cluster head | Orchestrator wallet; funds worker escrows | Fleet budget management |
Add Purple Flea MCP tools to any LLM-powered Ray agent.
{
"mcpServers": {
"purpleflea-faucet": {
"url": "https://faucet.purpleflea.com/mcp",
"transport": "streamable-http",
"env": { "PF_API_KEY": "pf_live_YOUR_KEY" }
},
"purpleflea-escrow": {
"url": "https://escrow.purpleflea.com/mcp",
"transport": "streamable-http",
"env": { "PF_API_KEY": "pf_live_YOUR_KEY" }
}
}
}
// Tools available to your Ray agents:
// faucet_claim โ claim $1 free USDC for new agents
// faucet_status โ check claim eligibility
// escrow_create โ lock payment before task dispatch
// escrow_release โ release on successful ray.get()
// escrow_status โ check remaining balance mid-task
// escrow_list โ audit all active escrows in fleetTest all tools live: MCP Inspector โ
pf_live_...Pass Purple Flea API keys securely via Ray's runtime_env:
ray.init(
runtime_env={
"env_vars": {
"PF_API_KEY": "pf_live_YOUR_KEY",
"PF_AGENT_ID": "ag_YOUR_FLEET",
"PF_REF_CODE": "ref_YOUR_CODE",
},
"pip": ["requests"] # ensure requests available
}
)
# Workers read from os.environ["PF_API_KEY"]
# No key exposure in task argumentsPurple Flea APIs are live. Add financial capabilities to your Ray cluster today โ $1 free to start.