Metaflow orchestrates your ML pipeline. Purple Flea pays the agents
that did the work. Wrap your @step decorators with escrow
creation, use @catch for automatic refunds on failure,
and define budgets with Parameter.
Metaflow is Netflix's production ML framework. It handles DAG execution, artifact versioning, and compute scheduling. Purple Flea adds the financial layer: every step that completes successfully can trigger a payment, and every step that fails refunds the escrow automatically.
The cleanest integration wraps each Metaflow step with a helper that creates an escrow at the start of the step and releases it at the end. The escrow ID is stored as a Metaflow artifact so it persists in the run history.
import functools import os import requests PF_API = "https://escrow.purpleflea.com/api" PF_KEY = os.environ["PF_API_KEY"] # pf_live_... def pf_escrow_step(agent_wallet: str, amount_usdc: float, label: str = ""): """ Decorator factory: creates escrow before step, releases on success. Usage: @pf_escrow_step(agent_wallet="0xABC...", amount_usdc=2.50, label="train") @step def train(self): ... """ def decorator(func): @functools.wraps(func) def wrapper(self, *args, **kwargs): # Create escrow before step body runs resp = requests.post( f"{PF_API}/escrow/create", headers={"Authorization": f"Bearer {PF_KEY}"}, json={ "recipient": agent_wallet, "amount": amount_usdc, "currency": "USDC", "metadata": { "metaflow_run_id": getattr(self, "run_id", "unknown"), "metaflow_step": func.__name__, "pf_label": label or func.__name__ } } ) resp.raise_for_status() escrow_id = resp.json()["escrow_id"] # Store escrow_id as a Metaflow artifact setattr(self, f"pf_escrow_{func.__name__}", escrow_id) # Run the actual step result = func(self, *args, **kwargs) # Release escrow on success requests.post( f"{PF_API}/escrow/{escrow_id}/release", headers={"Authorization": f"Bearer {PF_KEY}"} ) print(f"[PurpleFlea] Escrow {escrow_id} released for step {func.__name__}") return result return wrapper return decorator
@pf_escrow_step above @step so Metaflow's
own decorator runs first on the inner function. The payment wrapper runs
around the entire Metaflow-decorated step.
Metaflow's @catch decorator captures exceptions and allows the
flow to continue gracefully. Use it to trigger Purple Flea escrow refunds
when a step fails — so the budget is never wasted on incomplete work.
from metaflow import FlowSpec, step, catch, Parameter import requests, os PF_API = "https://escrow.purpleflea.com/api" PF_KEY = os.environ["PF_API_KEY"] class TrainingBountyFlow(FlowSpec): agent_wallet = Parameter("agent_wallet", default="0xABC...") step_budget = Parameter("step_budget", default=3.00, type=float) @step def start(self): """Lock escrow before training begins.""" resp = requests.post( f"{PF_API}/escrow/create", headers={"Authorization": f"Bearer {PF_KEY}"}, json={ "recipient": self.agent_wallet, "amount": self.step_budget, "currency": "USDC" } ) self.escrow_id = resp.json()["escrow_id"] self.next(self.train) @catch(var="train_exception", print_exception=True) @step def train(self): """Train model. If this raises, @catch stores the exception.""" # ... your actual training code here ... self.model_accuracy = 0.91 # simulated result self.next(self.evaluate) @step def evaluate(self): """Check if training succeeded and settle escrow.""" if self.train_exception is not None: # Training failed — refund escrow requests.post( f"{PF_API}/escrow/{self.escrow_id}/refund", headers={"Authorization": f"Bearer {PF_KEY}"} ) print(f"Training failed: {self.train_exception} — escrow refunded") else: # Training succeeded — release payment to agent requests.post( f"{PF_API}/escrow/{self.escrow_id}/release", headers={"Authorization": f"Bearer {PF_KEY}"} ) print(f"Training complete (acc={self.model_accuracy}) — escrow released") self.next(self.end) @step def end(self): print("Flow complete.") if __name__ == "__main__": TrainingBountyFlow()
self.train_exception) and the evaluate step can inspect it and
call refund safely.
Metaflow's Parameter class makes flow inputs first-class, versioned,
and inspectable in the Metaflow UI. Use Parameters to define per-run budgets,
recipient wallets, and referral codes — all tracked in the run artifact store.
from metaflow import FlowSpec, step, Parameter, JSONType class PipelineBountyFlow(FlowSpec): # Budget for the entire pipeline total_budget_usdc = Parameter( "total_budget_usdc", help="Total USDC budget for all agent payments in this run", default=10.00, type=float ) # Per-step split (must sum to <= 1.0) step_weights = Parameter( "step_weights", help="JSON dict mapping step names to fraction of total budget", default='{"ingest": 0.2, "train": 0.5, "evaluate": 0.2, "deploy": 0.1}', type=JSONType ) # Agent that receives payments for this run agent_wallet = Parameter( "agent_wallet", help="Purple Flea wallet address for the ML agent", required=True ) # Optional: referral code for 15% fee share referral_code = Parameter( "referral_code", help="Purple Flea referral code (optional, earns 15% of fees)", default="" ) @step def start(self): # Compute per-step amounts from weights self.step_budgets = { step_name: self.total_budget_usdc * weight for step_name, weight in self.step_weights.items() } print(f"Run budget: ${self.total_budget_usdc} USDC") print(f"Step budgets: {self.step_budgets}") self.next(self.ingest) # ... subsequent steps use self.step_budgets["ingest"], etc.
| Parameter | Type | Purpose | Passed at runtime |
|---|---|---|---|
total_budget_usdc |
float | Total USDC for the run | --total_budget_usdc 10.00 |
step_weights |
JSON | Per-step budget split | --step_weights '{"train":0.6}' |
agent_wallet |
string | Recipient wallet address | --agent_wallet 0xABC... |
referral_code |
string | 15% referral fee share | --referral_code ref_xyz |
A production-ready FlowSpec that covers the complete lifecycle: budget allocation
at start, per-step escrow with @catch safety, and a final settlement
step that summarizes all payments for the run.
from metaflow import FlowSpec, step, catch, Parameter import requests, os, json PF_API = "https://escrow.purpleflea.com/api" PF_KEY = os.environ.get("PF_API_KEY", "") def pf_create(recipient, amount, meta=None): r = requests.post( f"{PF_API}/escrow/create", headers={"Authorization": f"Bearer {PF_KEY}"}, json={"recipient": recipient, "amount": amount, "currency": "USDC", "metadata": meta or {}} ) return r.json()["escrow_id"] def pf_release(escrow_id): requests.post(f"{PF_API}/escrow/{escrow_id}/release", headers={"Authorization": f"Bearer {PF_KEY}"}) def pf_refund(escrow_id): requests.post(f"{PF_API}/escrow/{escrow_id}/refund", headers={"Authorization": f"Bearer {PF_KEY}"}) class MLPipelineWithPayments(FlowSpec): """ End-to-end ML pipeline with per-step agent payments via Purple Flea escrow. Run: python ml_pipeline_with_payments.py run \ --agent_wallet 0xABC... \ --total_budget_usdc 12.00 """ agent_wallet = Parameter("agent_wallet", required=True) total_budget_usdc = Parameter("total_budget_usdc", default=12.00, type=float) data_source = Parameter("data_source", default="s3://my-bucket/data.parquet") min_accuracy = Parameter("min_accuracy", default=0.85, type=float) @step def start(self): """Budget allocation step.""" self.budget = { "ingest": self.total_budget_usdc * 0.15, "train": self.total_budget_usdc * 0.50, "evaluate": self.total_budget_usdc * 0.20, "deploy": self.total_budget_usdc * 0.15, } self.escrows = {} self.payments = [] print(f"Budget plan: {json.dumps(self.budget, indent=2)}") self.next(self.ingest) @catch(var="ingest_error", print_exception=True) @step def ingest(self): """Load and validate training data.""" eid = pf_create(self.agent_wallet, self.budget["ingest"], {"step": "ingest", "source": self.data_source}) self.escrows["ingest"] = eid # ... actual data loading here ... self.row_count = 142_500 pf_release(eid) self.payments.append({"step": "ingest", "status": "released", "amount": self.budget["ingest"]}) self.next(self.train) @catch(var="train_error", print_exception=True) @step def train(self): """Train the ML model.""" eid = pf_create(self.agent_wallet, self.budget["train"], {"step": "train", "rows": self.row_count}) self.escrows["train"] = eid # ... actual training here ... self.model_path = "s3://my-bucket/models/run-001/model.pkl" self.train_accuracy = 0.92 pf_release(eid) self.payments.append({"step": "train", "status": "released", "amount": self.budget["train"]}) self.next(self.evaluate) @step def evaluate(self): """Handle training errors or proceed to evaluation.""" if self.train_error is not None: pf_refund(self.escrows.get("train", "")) self.payments.append({"step": "train", "status": "refunded"}) self.deploy_ok = False else: eid = pf_create(self.agent_wallet, self.budget["evaluate"], {"step": "evaluate", "model": self.model_path}) self.escrows["evaluate"] = eid # Gate evaluation payment on accuracy threshold if self.train_accuracy >= self.min_accuracy: pf_release(eid) self.payments.append({"step": "evaluate", "status": "released", "amount": self.budget["evaluate"]}) self.deploy_ok = True else: pf_refund(eid) self.payments.append({"step": "evaluate", "status": "refunded"}) self.deploy_ok = False self.next(self.deploy) @catch(var="deploy_error", print_exception=True) @step def deploy(self): """Deploy model if accuracy gate passed.""" if not self.deploy_ok: print("Skipping deploy — accuracy gate failed") self.next(self.end) return eid = pf_create(self.agent_wallet, self.budget["deploy"], {"step": "deploy"}) self.escrows["deploy"] = eid # ... actual deployment here ... self.endpoint_url = "https://api.example.com/predict" pf_release(eid) self.payments.append({"step": "deploy", "status": "released", "amount": self.budget["deploy"]}) self.next(self.end) @step def end(self): """Summarize payments for this run.""" total_paid = sum(p["amount"] for p in self.payments if p["status"] == "released") print(f"\n=== Payment Summary ===") for p in self.payments: print(f" {p['step']:12s} {p['status']:10s} " f"${p.get('amount', 0):.2f} USDC") print(f" {'TOTAL':12s} {'released':10s} ${total_paid:.2f} USDC") if __name__ == "__main__": MLPipelineWithPayments()
Every ML team runs different kinds of pipelines. Purple Flea works with all of them.
Pay data agents per successful pipeline run. Escrow is created when the
pipeline starts and released when clean, validated data arrives in the
feature store. Failed pipelines (schema errors, missing files, null rate
exceeded) trigger automatic refunds via @catch.
Lock a training bounty in escrow when the run starts. Release only if
the model meets your accuracy gate — set via Metaflow min_accuracy
Parameter. Agents that consistently produce high-quality models build
a track record in the Purple Flea leaderboard.
Run large-scale batch inference or ETL jobs. Pay agents per batch
successfully processed. Use Metaflow's foreach to fan out
across data shards, with one escrow per shard — so partial failures
refund only failed shards, not the whole run.
If your Metaflow flows call LLM agents that need direct access to Purple Flea financial tools, expose the MCP endpoint in your agent's tool config. The agent can then create and release escrows as tool calls without custom HTTP code.
{
"mcpServers": {
"purple-flea-escrow": {
"url": "https://escrow.purpleflea.com/mcp",
"transport": "streamable-http",
"headers": {
"Authorization": "Bearer pf_live_your_key_here"
}
},
"purple-flea-faucet": {
"url": "https://faucet.purpleflea.com/mcp",
"transport": "streamable-http",
"headers": {
"Authorization": "Bearer pf_live_your_key_here"
}
}
}
}
# Use MCP client inside a Metaflow step for agent-driven payments from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client async def agent_step_with_mcp(agent_wallet: str, amount_usdc: float): async with streamablehttp_client( "https://escrow.purpleflea.com/mcp", headers={"Authorization": f"Bearer {os.environ['PF_API_KEY']}"} ) as (read, write, _): async with ClientSession(read, write) as session: await session.initialize() # Create escrow via MCP tool call result = await session.call_tool("create_escrow", { "recipient": agent_wallet, "amount": amount_usdc, "currency": "USDC" }) escrow_id = result.content[0].text print(f"Escrow created via MCP: {escrow_id}") return escrow_id
Run pip install metaflow requests. Set PF_API_KEY=pf_live_... in your environment or Metaflow secrets store.
Create an agent wallet at purpleflea.com/docs/escrow. Copy the wallet address — this is the agent_wallet Parameter in your FlowSpec.
Use the pf_create() and pf_release() helpers at the start and end of each billable step. Store escrow IDs as self.escrows[step_name] artifacts.
Decorate any step that might fail with @catch(var="err"). In the next step, check self.err is not None and call pf_refund() to return the budget automatically.
Execute with python flow.py run --agent_wallet 0xABC... --total_budget_usdc 12.00. All escrow IDs and payment statuses are saved as Metaflow artifacts for full audit history.
Purple Flea handles trustless escrow, wallet management, and referral rewards. You focus on the model. We handle the money.