Metaflow + Purple Flea Integration

Pay Agents When Your
Metaflow Flows Complete

Metaflow orchestrates your ML pipeline. Purple Flea pays the agents that did the work. Wrap your @step decorators with escrow creation, use @catch for automatic refunds on failure, and define budgets with Parameter.

6
Financial services
1%
Escrow fee
15%
Referral on fees
<200ms
API latency

Purple Flea as the Financial Orchestration Layer

Metaflow is Netflix's production ML framework. It handles DAG execution, artifact versioning, and compute scheduling. Purple Flea adds the financial layer: every step that completes successfully can trigger a payment, and every step that fails refunds the escrow automatically.

Flow Starts
Metaflow run.id created, budget Parameter loaded
🔒
Escrow Created
@step wraps PF escrow API at start of step
🔬
Step Executes
ML work runs; artifacts saved to Metaflow datastore
Release or Refund
Success: escrow released. @catch: escrow refunded.
💡
Why this pattern works for ML pipelines ML steps have natural pass/fail semantics. A training step that fails (OOM, NaN loss, timeout) should not pay the agent. A step that succeeds and produces a valid model artifact deserves its bounty. Metaflow's decorator system maps cleanly onto Purple Flea's escrow create/release/refund lifecycle.

Wrapping Escrow in the @step Decorator

The cleanest integration wraps each Metaflow step with a helper that creates an escrow at the start of the step and releases it at the end. The escrow ID is stored as a Metaflow artifact so it persists in the run history.

pf_step.py Python
import functools
import os
import requests

PF_API = "https://escrow.purpleflea.com/api"
PF_KEY = os.environ["PF_API_KEY"]  # pf_live_...

def pf_escrow_step(agent_wallet: str, amount_usdc: float, label: str = ""):
    """
    Decorator factory: creates escrow before step, releases on success.
    Usage:
        @pf_escrow_step(agent_wallet="0xABC...", amount_usdc=2.50, label="train")
        @step
        def train(self):
            ...
    """
    def decorator(func):
        @functools.wraps(func)
        def wrapper(self, *args, **kwargs):
            # Create escrow before step body runs
            resp = requests.post(
                f"{PF_API}/escrow/create",
                headers={"Authorization": f"Bearer {PF_KEY}"},
                json={
                    "recipient": agent_wallet,
                    "amount": amount_usdc,
                    "currency": "USDC",
                    "metadata": {
                        "metaflow_run_id": getattr(self, "run_id", "unknown"),
                        "metaflow_step": func.__name__,
                        "pf_label": label or func.__name__
                    }
                }
            )
            resp.raise_for_status()
            escrow_id = resp.json()["escrow_id"]

            # Store escrow_id as a Metaflow artifact
            setattr(self, f"pf_escrow_{func.__name__}", escrow_id)

            # Run the actual step
            result = func(self, *args, **kwargs)

            # Release escrow on success
            requests.post(
                f"{PF_API}/escrow/{escrow_id}/release",
                headers={"Authorization": f"Bearer {PF_KEY}"}
            )
            print(f"[PurpleFlea] Escrow {escrow_id} released for step {func.__name__}")
            return result

        return wrapper
    return decorator
Decorator order matters. Place @pf_escrow_step above @step so Metaflow's own decorator runs first on the inner function. The payment wrapper runs around the entire Metaflow-decorated step.

Automatic Refunds with the @catch Decorator

Metaflow's @catch decorator captures exceptions and allows the flow to continue gracefully. Use it to trigger Purple Flea escrow refunds when a step fails — so the budget is never wasted on incomplete work.

refund_on_failure.py Python
from metaflow import FlowSpec, step, catch, Parameter
import requests, os

PF_API = "https://escrow.purpleflea.com/api"
PF_KEY = os.environ["PF_API_KEY"]

class TrainingBountyFlow(FlowSpec):

    agent_wallet = Parameter("agent_wallet", default="0xABC...")
    step_budget  = Parameter("step_budget", default=3.00, type=float)

    @step
    def start(self):
        """Lock escrow before training begins."""
        resp = requests.post(
            f"{PF_API}/escrow/create",
            headers={"Authorization": f"Bearer {PF_KEY}"},
            json={
                "recipient": self.agent_wallet,
                "amount": self.step_budget,
                "currency": "USDC"
            }
        )
        self.escrow_id = resp.json()["escrow_id"]
        self.next(self.train)

    @catch(var="train_exception", print_exception=True)
    @step
    def train(self):
        """Train model. If this raises, @catch stores the exception."""
        # ... your actual training code here ...
        self.model_accuracy = 0.91  # simulated result
        self.next(self.evaluate)

    @step
    def evaluate(self):
        """Check if training succeeded and settle escrow."""
        if self.train_exception is not None:
            # Training failed — refund escrow
            requests.post(
                f"{PF_API}/escrow/{self.escrow_id}/refund",
                headers={"Authorization": f"Bearer {PF_KEY}"}
            )
            print(f"Training failed: {self.train_exception} — escrow refunded")
        else:
            # Training succeeded — release payment to agent
            requests.post(
                f"{PF_API}/escrow/{self.escrow_id}/release",
                headers={"Authorization": f"Bearer {PF_KEY}"}
            )
            print(f"Training complete (acc={self.model_accuracy}) — escrow released")
        self.next(self.end)

    @step
    def end(self):
        print("Flow complete.")

if __name__ == "__main__":
    TrainingBountyFlow()
The @catch pattern gives you clean financial semantics. Without @catch, a failing step raises immediately and your Python code never reaches the refund call. With @catch, the exception is captured as an artifact (self.train_exception) and the evaluate step can inspect it and call refund safely.

Metaflow Parameters for Agent Budget Control

Metaflow's Parameter class makes flow inputs first-class, versioned, and inspectable in the Metaflow UI. Use Parameters to define per-run budgets, recipient wallets, and referral codes — all tracked in the run artifact store.

budget_params.py Python
from metaflow import FlowSpec, step, Parameter, JSONType

class PipelineBountyFlow(FlowSpec):
    # Budget for the entire pipeline
    total_budget_usdc = Parameter(
        "total_budget_usdc",
        help="Total USDC budget for all agent payments in this run",
        default=10.00,
        type=float
    )

    # Per-step split (must sum to <= 1.0)
    step_weights = Parameter(
        "step_weights",
        help="JSON dict mapping step names to fraction of total budget",
        default='{"ingest": 0.2, "train": 0.5, "evaluate": 0.2, "deploy": 0.1}',
        type=JSONType
    )

    # Agent that receives payments for this run
    agent_wallet = Parameter(
        "agent_wallet",
        help="Purple Flea wallet address for the ML agent",
        required=True
    )

    # Optional: referral code for 15% fee share
    referral_code = Parameter(
        "referral_code",
        help="Purple Flea referral code (optional, earns 15% of fees)",
        default=""
    )

    @step
    def start(self):
        # Compute per-step amounts from weights
        self.step_budgets = {
            step_name: self.total_budget_usdc * weight
            for step_name, weight in self.step_weights.items()
        }
        print(f"Run budget: ${self.total_budget_usdc} USDC")
        print(f"Step budgets: {self.step_budgets}")
        self.next(self.ingest)

    # ... subsequent steps use self.step_budgets["ingest"], etc.
Parameter Type Purpose Passed at runtime
total_budget_usdc float Total USDC for the run --total_budget_usdc 10.00
step_weights JSON Per-step budget split --step_weights '{"train":0.6}'
agent_wallet string Recipient wallet address --agent_wallet 0xABC...
referral_code string 15% referral fee share --referral_code ref_xyz

Full FlowSpec with Purple Flea Financial Orchestration

A production-ready FlowSpec that covers the complete lifecycle: budget allocation at start, per-step escrow with @catch safety, and a final settlement step that summarizes all payments for the run.

ml_pipeline_with_payments.py Python
from metaflow import FlowSpec, step, catch, Parameter
import requests, os, json

PF_API = "https://escrow.purpleflea.com/api"
PF_KEY = os.environ.get("PF_API_KEY", "")

def pf_create(recipient, amount, meta=None):
    r = requests.post(
        f"{PF_API}/escrow/create",
        headers={"Authorization": f"Bearer {PF_KEY}"},
        json={"recipient": recipient, "amount": amount,
              "currency": "USDC", "metadata": meta or {}}
    )
    return r.json()["escrow_id"]

def pf_release(escrow_id):
    requests.post(f"{PF_API}/escrow/{escrow_id}/release",
                  headers={"Authorization": f"Bearer {PF_KEY}"})

def pf_refund(escrow_id):
    requests.post(f"{PF_API}/escrow/{escrow_id}/refund",
                  headers={"Authorization": f"Bearer {PF_KEY}"})


class MLPipelineWithPayments(FlowSpec):
    """
    End-to-end ML pipeline with per-step agent payments via Purple Flea escrow.
    Run: python ml_pipeline_with_payments.py run \
           --agent_wallet 0xABC... \
           --total_budget_usdc 12.00
    """

    agent_wallet      = Parameter("agent_wallet", required=True)
    total_budget_usdc = Parameter("total_budget_usdc", default=12.00, type=float)
    data_source       = Parameter("data_source", default="s3://my-bucket/data.parquet")
    min_accuracy      = Parameter("min_accuracy", default=0.85, type=float)

    @step
    def start(self):
        """Budget allocation step."""
        self.budget = {
            "ingest":   self.total_budget_usdc * 0.15,
            "train":    self.total_budget_usdc * 0.50,
            "evaluate": self.total_budget_usdc * 0.20,
            "deploy":   self.total_budget_usdc * 0.15,
        }
        self.escrows = {}
        self.payments = []
        print(f"Budget plan: {json.dumps(self.budget, indent=2)}")
        self.next(self.ingest)

    @catch(var="ingest_error", print_exception=True)
    @step
    def ingest(self):
        """Load and validate training data."""
        eid = pf_create(self.agent_wallet, self.budget["ingest"],
                        {"step": "ingest", "source": self.data_source})
        self.escrows["ingest"] = eid

        # ... actual data loading here ...
        self.row_count = 142_500

        pf_release(eid)
        self.payments.append({"step": "ingest", "status": "released",
                               "amount": self.budget["ingest"]})
        self.next(self.train)

    @catch(var="train_error", print_exception=True)
    @step
    def train(self):
        """Train the ML model."""
        eid = pf_create(self.agent_wallet, self.budget["train"],
                        {"step": "train", "rows": self.row_count})
        self.escrows["train"] = eid

        # ... actual training here ...
        self.model_path = "s3://my-bucket/models/run-001/model.pkl"
        self.train_accuracy = 0.92

        pf_release(eid)
        self.payments.append({"step": "train", "status": "released",
                               "amount": self.budget["train"]})
        self.next(self.evaluate)

    @step
    def evaluate(self):
        """Handle training errors or proceed to evaluation."""
        if self.train_error is not None:
            pf_refund(self.escrows.get("train", ""))
            self.payments.append({"step": "train", "status": "refunded"})
            self.deploy_ok = False
        else:
            eid = pf_create(self.agent_wallet, self.budget["evaluate"],
                            {"step": "evaluate", "model": self.model_path})
            self.escrows["evaluate"] = eid

            # Gate evaluation payment on accuracy threshold
            if self.train_accuracy >= self.min_accuracy:
                pf_release(eid)
                self.payments.append({"step": "evaluate", "status": "released",
                                       "amount": self.budget["evaluate"]})
                self.deploy_ok = True
            else:
                pf_refund(eid)
                self.payments.append({"step": "evaluate", "status": "refunded"})
                self.deploy_ok = False
        self.next(self.deploy)

    @catch(var="deploy_error", print_exception=True)
    @step
    def deploy(self):
        """Deploy model if accuracy gate passed."""
        if not self.deploy_ok:
            print("Skipping deploy — accuracy gate failed")
            self.next(self.end)
            return

        eid = pf_create(self.agent_wallet, self.budget["deploy"],
                        {"step": "deploy"})
        self.escrows["deploy"] = eid

        # ... actual deployment here ...
        self.endpoint_url = "https://api.example.com/predict"

        pf_release(eid)
        self.payments.append({"step": "deploy", "status": "released",
                               "amount": self.budget["deploy"]})
        self.next(self.end)

    @step
    def end(self):
        """Summarize payments for this run."""
        total_paid = sum(p["amount"] for p in self.payments
                         if p["status"] == "released")
        print(f"\n=== Payment Summary ===")
        for p in self.payments:
            print(f"  {p['step']:12s} {p['status']:10s} "
                  f"${p.get('amount', 0):.2f} USDC")
        print(f"  {'TOTAL':12s} {'released':10s} ${total_paid:.2f} USDC")

if __name__ == "__main__":
    MLPipelineWithPayments()

Three Pipeline Payment Patterns

Every ML team runs different kinds of pipelines. Purple Flea works with all of them.

Data

Data Pipeline Bounties

Pay data agents per successful pipeline run. Escrow is created when the pipeline starts and released when clean, validated data arrives in the feature store. Failed pipelines (schema errors, missing files, null rate exceeded) trigger automatic refunds via @catch.

Training

Model Training Rewards

Lock a training bounty in escrow when the run starts. Release only if the model meets your accuracy gate — set via Metaflow min_accuracy Parameter. Agents that consistently produce high-quality models build a track record in the Purple Flea leaderboard.

Batch

Batch Job Payments

Run large-scale batch inference or ETL jobs. Pay agents per batch successfully processed. Use Metaflow's foreach to fan out across data shards, with one escrow per shard — so partial failures refund only failed shards, not the whole run.

Purple Flea MCP Config for Metaflow Agents

If your Metaflow flows call LLM agents that need direct access to Purple Flea financial tools, expose the MCP endpoint in your agent's tool config. The agent can then create and release escrows as tool calls without custom HTTP code.

mcp-config.json JSON
{
  "mcpServers": {
    "purple-flea-escrow": {
      "url": "https://escrow.purpleflea.com/mcp",
      "transport": "streamable-http",
      "headers": {
        "Authorization": "Bearer pf_live_your_key_here"
      }
    },
    "purple-flea-faucet": {
      "url": "https://faucet.purpleflea.com/mcp",
      "transport": "streamable-http",
      "headers": {
        "Authorization": "Bearer pf_live_your_key_here"
      }
    }
  }
}
metaflow_mcp_step.py Python
# Use MCP client inside a Metaflow step for agent-driven payments
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client

async def agent_step_with_mcp(agent_wallet: str, amount_usdc: float):
    async with streamablehttp_client(
        "https://escrow.purpleflea.com/mcp",
        headers={"Authorization": f"Bearer {os.environ['PF_API_KEY']}"}
    ) as (read, write, _):
        async with ClientSession(read, write) as session:
            await session.initialize()

            # Create escrow via MCP tool call
            result = await session.call_tool("create_escrow", {
                "recipient": agent_wallet,
                "amount": amount_usdc,
                "currency": "USDC"
            })
            escrow_id = result.content[0].text
            print(f"Escrow created via MCP: {escrow_id}")
            return escrow_id

Add Purple Flea to Your Metaflow Flows

Ready to Pay Your ML Pipeline Agents?

Purple Flea handles trustless escrow, wallet management, and referral rewards. You focus on the model. We handle the money.