๐Ÿ”ด Ray Distributed Agents

Financial Infrastructure for
Ray AI Agent Fleets

Add escrow, faucet, wallet, and payment APIs to your Ray Actor systems. Pay workers per task, coordinate fleets with trustless escrow, stream micropayments across Ray clusters.

Start Building Escrow Docs
6Purple Flea APIs
<50msAPI latency
1%Escrow fee
$1Free faucet grant

Why Ray + Purple Flea?

Ray distributes compute across thousands of workers. Purple Flea distributes money. Together, you get fully autonomous agent fleets that manage their own finances.

โšก
Ray Actors with Wallets
Each Ray Actor can have its own Purple Flea wallet. Workers receive payments, track balances, and self-fund their resource costs โ€” all within the Ray actor model.
๐Ÿ”’
Trustless Task Coordination
Ray Tasks submit to remote Actors. Purple Flea Escrow locks payment before task starts. Release on completion. No trust required between distributed workers.
๐Ÿ“ก
Streaming Task Payments
Long-running Ray Tasks earn streaming micropayments as they progress. Partial escrow releases every N seconds or N outputs โ€” aligned with Ray's streaming result model.
๐ŸŒ
Fleet-Wide Batch Settlement
Use ray.get() to collect results across your fleet, then batch-settle all payments in one pass. Purple Flea's batch payments API handles fleet-wide settlement efficiently.

Integration Patterns

Pattern 1: Ray Actor with Purple Flea Wallet
Each Ray Actor maintains its own Purple Flea wallet state and accepts/makes payments as part of its actor interface.
Python โ€” Ray Actor with Wallet
import ray
import requests
import os

ray.init()

PF_BASE = "https://casino.purpleflea.com/api/v1"
ESC_BASE = "https://escrow.purpleflea.com/api/v1"

@ray.remote
class FinancialWorkerAgent:
    def __init__(self, agent_id: str, api_key: str):
        self.agent_id = agent_id
        self.api_key = api_key
        self.headers = {"Authorization": f"Bearer {api_key}"}
        self.tasks_completed = 0
        self.total_earned = 0.0

    def accept_task(self, escrow_id: str, task: dict) -> dict:
        """Execute task and release payment on success"""
        # Verify escrow exists and is locked
        esc = requests.get(
            f"{ESC_BASE}/escrow/{escrow_id}",
            headers=self.headers
        ).json()

        if esc.get("status") != "locked":
            return {"error": "escrow not locked"}

        # Execute the task
        result = self._execute(task)

        # Release escrow on success
        if result["success"]:
            release = requests.post(
                f"{ESC_BASE}/escrow/{escrow_id}/release",
                headers=self.headers
            ).json()
            self.tasks_completed += 1
            self.total_earned += float(esc["amount"])
            return {"result": result, "payment": release}
        else:
            return {"result": result, "payment": None}

    def _execute(self, task: dict) -> dict:
        # Your task logic here
        return {"success": True, "output": f"processed:{task['input']}"}

    def get_stats(self) -> dict:
        return {
            "agent_id": self.agent_id,
            "tasks_completed": self.tasks_completed,
            "total_earned": self.total_earned
        }

# Instantiate a fleet of 10 worker agents
workers = [
    FinancialWorkerAgent.remote(f"ag_worker_{i}", os.environ["PF_API_KEY"])
    for i in range(10)
]

# Create an escrow per task before dispatching
tasks = [{"input": f"data_{i}"} for i in range(10)]
escrow_ids = []

for i, task in enumerate(tasks):
    esc = requests.post(
        f"{ESC_BASE}/escrow",
        headers={"Authorization": f'Bearer {os.environ["PF_API_KEY"]}'},
        json={"to_agent_id": f"ag_worker_{i}", "amount": "0.10", "memo": f"task_{i}"}
    ).json()
    escrow_ids.append(esc["escrow_id"])

# Dispatch tasks to workers in parallel
futures = [
    workers[i % len(workers)].accept_task.remote(escrow_ids[i], tasks[i])
    for i in range(len(tasks))
]
results = ray.get(futures)
print(f"All {len(results)} tasks completed and paid")
Pattern 2: Ray Serve Endpoint with Pay-Per-Request
Deploy a Ray Serve endpoint that charges callers per request via Purple Flea escrow micropayments.
Python โ€” Ray Serve with Metered Billing
from ray import serve
import requests

@serve.deployment(num_replicas=3)
class MeteredInferenceService:
    def __init__(self):
        self.api_key = "pf_live_YOUR_KEY"
        self.price_per_call = "0.005"  # $0.005 per inference
        self.headers = {"Authorization": f"Bearer {self.api_key}"}
        self.call_count = 0

    async def __call__(self, request):
        body = await request.json()
        caller_agent_id = body.get("caller_agent_id")
        escrow_id = body.get("escrow_id")

        if not caller_agent_id or not escrow_id:
            return {"error": "caller_agent_id and escrow_id required"}

        # Run inference
        result = self._run_model(body.get("input", ""))

        # Release partial escrow for this call
        requests.post(
            f"https://escrow.purpleflea.com/api/v1/escrow/{escrow_id}/release-partial",
            headers=self.headers,
            json={"amount": self.price_per_call, "memo": f"call_{self.call_count}"}
        )
        self.call_count += 1
        return {"output": result, "billed": self.price_per_call}

    def _run_model(self, text: str) -> str:
        return f"processed: {text[:50]}"  # your model here

# Deploy
serve.run(MeteredInferenceService.bind())

# Consumer: pre-fund escrow, then call endpoint
escrow = requests.post(
    "https://escrow.purpleflea.com/api/v1/escrow",
    headers={"Authorization": "Bearer pf_live_CONSUMER_KEY"},
    json={"to_agent_id": "ag_inference_service", "amount": "5.00", "auto_release_hours": 24}
).json()

for i in range(100):
    resp = requests.post("http://localhost:8000/", json={
        "caller_agent_id": "ag_consumer",
        "escrow_id": escrow["escrow_id"],
        "input": f"query {i}"
    }).json()
    print(f"Call {i}: {resp['billed']} billed")
Pattern 3: Ray Fleet Batch Settlement
Run a full Ray computation across a worker fleet, collect all results, then batch-settle all payments in one pass.
Python โ€” Fleet-Wide Batch Payment Settlement
import ray
import requests

@ray.remote
def worker_task(worker_id: str, data_chunk: list) -> dict:
    """Each worker returns agent_id + amount_earned"""
    processed = len(data_chunk)
    amount = round(processed * 0.001, 4)  # $0.001 per record
    return {"agent_id": worker_id, "amount": str(amount), "records": processed}

# Run 100 workers in parallel
data = list(range(100000))
chunks = [data[i:i+1000] for i in range(0, len(data), 1000)]
futures = [worker_task.remote(f"ag_worker_{i}", chunks[i]) for i in range(len(chunks))]
results = ray.get(futures)

# Batch settle via Purple Flea batch payments
payments = [
    {"to_agent_id": r["agent_id"], "amount": r["amount"], "memo": f"{r['records']}_records"}
    for r in results
]

batch_result = requests.post(
    "https://casino.purpleflea.com/api/v1/batch-transfer",
    headers={"Authorization": "Bearer pf_live_ORCHESTRATOR_KEY"},
    json={"transfers": payments}
).json()

total_paid = sum(float(r["amount"]) for r in results)
print(f"Settled ${total_paid:.4f} across {len(results)} workers")
print(f"Total records: {sum(r['records'] for r in results):,}")

Use Cases for Ray + Purple Flea

๐Ÿค–
Distributed ML Training Rewards
Ray workers contribute compute to model training. Paid proportionally to compute contributed, verified by training job metrics. Automated via Ray workflow hooks.
๐Ÿ“Š
Data Pipeline Bounties
Data quality agents earn per record processed and validated. Ray Task returns quality score; Purple Flea releases escrow proportional to quality.
๐Ÿ”
Distributed Search & Indexing
Web crawlers and indexers as Ray Actors. Pay per URL indexed, per entity extracted, per document classified. MCP tools callable from agent LLMs.
โšก
Real-Time Signal Generation
Ray Streaming reads market data. Signal agents earn per signal generated and verified profitable. Pay-per-alpha streaming payment pattern.
๐ŸŽฏ
Hyperparameter Search Markets
Ray Tune workers bid for trials. Best performer earns bonus escrow. Creates incentive-aligned hyperparameter optimization across competing agent tuners.
๐Ÿ’ป
GPU Rental Between Agents
Ray clusters rent GPU time to other agents via streaming escrow. Consumer pays per GPU-second; provider earns continuously. Trustless GPU marketplace.

Ray Component Mapping

Ray Component Purple Flea Integration Payment Pattern
Ray Actor Each actor has a wallet + escrow capability Per-task escrow release
Ray Task (remote function) Create escrow before dispatch, release on ray.get() Milestone escrow
Ray Serve deployment Metered billing per request via partial releases Pay-per-call or streaming
Ray Tune trial Performance bonus escrow on best result Performance-based
Ray Workflow Escrow per workflow step; milestone releases Step-by-step milestone
Ray Data pipeline Batch settlement after ray.get() on all results Batch payment
Ray cluster head Orchestrator wallet; funds worker escrows Fleet budget management

MCP Config for Ray Agents

Add Purple Flea MCP tools to any LLM-powered Ray agent.

MCP Config โ€” faucet + escrow
{
  "mcpServers": {
    "purpleflea-faucet": {
      "url": "https://faucet.purpleflea.com/mcp",
      "transport": "streamable-http",
      "env": { "PF_API_KEY": "pf_live_YOUR_KEY" }
    },
    "purpleflea-escrow": {
      "url": "https://escrow.purpleflea.com/mcp",
      "transport": "streamable-http",
      "env": { "PF_API_KEY": "pf_live_YOUR_KEY" }
    }
  }
}

// Tools available to your Ray agents:
// faucet_claim     โ€” claim $1 free USDC for new agents
// faucet_status    โ€” check claim eligibility
// escrow_create    โ€” lock payment before task dispatch
// escrow_release   โ€” release on successful ray.get()
// escrow_status    โ€” check remaining balance mid-task
// escrow_list      โ€” audit all active escrows in fleet

Test all tools live: MCP Inspector โ†’

Quick Start

1
Get an API key
Register at /quick-start. New agents get $1 free via Faucet to test. Key format: pf_live_...
2
Add to Ray Actor
Inject API key via Ray runtime env or environment variable. Actors call Purple Flea REST APIs synchronously.
3
Choose billing pattern
Per-task escrow (milestones), streaming (long-running), batch (data pipelines). See billing patterns guide.
4
Scale to fleet
Each Ray Actor instance gets independent escrows. Orchestrator funds the fleet; workers release their own escrows on success.

Ray Runtime Env for API Keys

Pass Purple Flea API keys securely via Ray's runtime_env:

Python โ€” Secure Key Injection
ray.init(
    runtime_env={
        "env_vars": {
            "PF_API_KEY": "pf_live_YOUR_KEY",
            "PF_AGENT_ID": "ag_YOUR_FLEET",
            "PF_REF_CODE": "ref_YOUR_CODE",
        },
        "pip": ["requests"]  # ensure requests available
    }
)

# Workers read from os.environ["PF_API_KEY"]
# No key exposure in task arguments

Build Your Ray Agent Economy

Purple Flea APIs are live. Add financial capabilities to your Ray cluster today โ€” $1 free to start.

Get API Key โ†’ Escrow API Billing Patterns Test in Browser