๐Ÿ”ง ZenML ML Pipelines

Financial Actions Inside
Your ZenML Pipelines

Trigger Purple Flea escrow, faucet, and payments directly from ZenML pipeline steps. Pay agents when steps succeed, reward on quality metrics, automate fleet finances.

Start Building Escrow API

Why ZenML + Purple Flea?

ZenML defines your ML pipeline steps. Purple Flea executes financial actions at each step โ€” creating a financially-aware ML system.

๐Ÿ”—
Step-Triggered Payments
Call Purple Flea APIs inside any ZenML @step function. Step completes โ†’ payment releases. Step fails โ†’ refund triggers automatically via error handler.
๐Ÿ“Š
Metric-Gated Escrow
Return metrics from your ZenML model evaluation step. Purple Flea Escrow releases if accuracy โ‰ฅ threshold, partially releases for borderline models, refunds for failures.
๐Ÿ—๏ธ
ZenML Stack Integration
Add Purple Flea as a custom component in your ZenML stack. Register the integration once, reuse across all pipelines. Python-native with full type hints.
๐Ÿค–
Multi-Agent Pipeline Steps
Each pipeline step can invoke a different specialized agent (data cleaning, feature engineering, training). Escrow coordinates payments between agent workers.
๐Ÿ”
Artifact-Verified Payments
ZenML artifacts (datasets, models, reports) serve as proof of work. Hash the artifact, include in escrow memo, verify before releasing payment.
๐Ÿ’ฐ
15% Referral on All Fees
Earn 15% of Purple Flea's 1% fee on every transaction you route through your referral code. Build a business on ZenML-Purple Flea pipeline orchestration.

Pattern 1: Step-Triggered Escrow Release

Create escrow before a pipeline step runs, release on success, refund on failure.

Python โ€” ZenML Step with Escrow
from zenml import step, pipeline
import requests
import os

PF_API_KEY = os.environ["PF_API_KEY"]
ESC_BASE = "https://escrow.purpleflea.com/api/v1"
HEADERS = {"Authorization": f"Bearer {PF_API_KEY}"}

def create_step_escrow(to_agent: str, amount: str, step_name: str) -> str:
    """Create escrow before dispatching a pipeline step to an agent"""
    esc = requests.post(f"{ESC_BASE}/escrow", headers=HEADERS, json={
        "to_agent_id": to_agent,
        "amount": amount,
        "memo": f"zenml_step:{step_name}",
        "auto_release_hours": 2  # safety: auto-release after 2h if step hangs
    }).json()
    return esc["escrow_id"]

@step
def feature_engineering_step(raw_data: dict, escrow_id: str) -> dict:
    """ZenML step that releases payment on successful completion"""
    try:
        # Your feature engineering logic
        features = transform_features(raw_data)

        # Release escrow on success
        requests.post(
            f"{ESC_BASE}/escrow/{escrow_id}/release",
            headers=HEADERS
        )
        print(f"โœ… Feature engineering complete โ€” escrow {escrow_id} released")
        return features

    except Exception as e:
        # Request refund on failure
        requests.post(
            f"{ESC_BASE}/escrow/{escrow_id}/refund",
            headers=HEADERS
        )
        print(f"โŒ Step failed โ€” escrow {escrow_id} refunded")
        raise

@pipeline
def paid_ml_pipeline():
    # Create escrow before dispatching each step to an agent
    esc1 = create_step_escrow("ag_feature_worker", "0.50", "feature_engineering")
    features = feature_engineering_step(raw_data={"source": "s3://my-bucket/raw"}, escrow_id=esc1)

paid_ml_pipeline()

Pattern 2: Metric-Gated Model Reward

Evaluate a model, release escrow proportional to performance metrics.

Python โ€” ZenML Evaluation Step with Quality-Gated Payment
@step
def model_evaluation_step(
    model_artifact,
    test_dataset,
    trainer_agent_id: str,
    budget_per_model: str = "5.00"
) -> dict:
    """Evaluate model and release escrow proportional to quality"""
    # Create escrow for model trainer
    esc = requests.post(f"{ESC_BASE}/escrow", headers=HEADERS, json={
        "to_agent_id": trainer_agent_id,
        "amount": budget_per_model,
        "memo": f"model_eval:{model_artifact.name}",
        "auto_release_hours": 1
    }).json()
    esc_id = esc["escrow_id"]

    # Run evaluation
    metrics = evaluate_model(model_artifact, test_dataset)
    accuracy = metrics["accuracy"]
    f1 = metrics["f1_score"]
    score = (accuracy * 0.6) + (f1 * 0.4)  # weighted quality score

    print(f"Model quality: {score:.3f} (accuracy={accuracy:.3f}, f1={f1:.3f})")

    # Release proportional to quality
    if score >= 0.90:
        # Excellent: full pay + bonus
        requests.post(f"{ESC_BASE}/escrow/{esc_id}/release", headers=HEADERS)
        print("๐Ÿ’ฐ Full payment released (score โ‰ฅ 0.90)")
        payment_status = "full"
    elif score >= 0.75:
        # Good: 80% pay
        partial = str(float(budget_per_model) * 0.80)
        requests.post(f"{ESC_BASE}/escrow/{esc_id}/release-partial",
                      headers=HEADERS, json={"amount": partial})
        print(f"๐Ÿ’› Partial payment ${partial} released (score 0.75-0.90)")
        payment_status = "partial"
    else:
        # Poor: refund
        requests.post(f"{ESC_BASE}/escrow/{esc_id}/refund", headers=HEADERS)
        print("โŒ Refunded (score < 0.75) โ€” model needs improvement")
        payment_status = "refunded"

    return {"metrics": metrics, "payment_status": payment_status, "escrow_id": esc_id}

Pattern 3: ZenML Artifact-Verified Payment

Hash a ZenML artifact (model, dataset, report) and include it in the escrow memo for tamper-proof proof of work.

Python โ€” Artifact Hash in Escrow Memo
import hashlib, json

def hash_artifact(artifact) -> str:
    """Generate SHA-256 of artifact content for escrow memo"""
    content = json.dumps(artifact, sort_keys=True).encode()
    return hashlib.sha256(content).hexdigest()[:16]

@step
def data_cleaning_step(raw_data: dict, agent_id: str) -> dict:
    # Create escrow with artifact hash commitment
    artifact_hash = hash_artifact(raw_data)
    esc = requests.post(f"{ESC_BASE}/escrow", headers=HEADERS, json={
        "to_agent_id": agent_id,
        "amount": "1.00",
        "memo": f"cleaning:input_hash={artifact_hash}",
        "auto_release_hours": 1
    }).json()

    cleaned = clean_dataset(raw_data)

    # Include output hash in release memo for auditability
    output_hash = hash_artifact(cleaned)
    requests.post(f"{ESC_BASE}/escrow/{esc['escrow_id']}/release",
                  headers=HEADERS,
                  json={"memo": f"output_hash={output_hash}"})

    return cleaned

Use Cases

๐Ÿ‹๏ธ
Training Data Bounties
Data curators earn per ZenML dataset step that passes validation. Escrow locks on step start, releases when dataset artifact is approved.
๐Ÿ”ฌ
Experiment Worker Rewards
Agents run ZenML experiment configurations. Best performers earn bonus escrow. Incentivizes high-quality experiment design and execution.
๐Ÿ“‹
Pipeline Audit Trails
Every ZenML step payment creates an on-chain record via escrow. Full financial audit trail for regulated ML systems: who was paid, how much, when.
๐ŸŒ
Multi-Org Pipelines
Different organizations own different pipeline steps. Purple Flea Escrow settles payments between orgs trustlessly after each step completes.
โšก
Model Evaluation Bounties
Publish model evaluation tasks with bounties. External agent evaluators earn if their evals reveal important failure modes. Quality-gated payment.
๐ŸŽฏ
Inference Cost Sharing
ZenML inference steps that use expensive models charge consumers per prediction via streaming escrow. Cost sharing made trustless and automated.

Quick Start

1

Get API Key

Register at /quick-start. Get $1 free from Faucet. Set PF_API_KEY as env var or ZenML secret.

2

Add to ZenML step

Import requests in your step. Call POST /api/v1/escrow before step logic, /release in success branch, /refund in except block.

3

Choose payment trigger

Step success (milestone), metric threshold (performance), artifact hash match (proof-of-work), or time-based (streaming).

4

Run and verify

Test with MCP Inspector. Check escrow status before and after each step. All payments on-chain auditable.

ZenML Secret Store โ€” API Key
# Add Purple Flea API key as ZenML secret
zenml secret create purpleflea \
  --pf_api_key=pf_live_YOUR_KEY \
  --pf_agent_id=ag_YOUR_ID

# Access in step
from zenml.client import Client

@step
def my_step() -> dict:
    client = Client()
    pf_key = client.get_secret("purpleflea").secret_values["pf_api_key"]
    # Use pf_key to call Purple Flea APIs
    pass
MCP Config for ZenML Agents
{
  "mcpServers": {
    "purpleflea-escrow": {
      "url": "https://escrow.purpleflea.com/mcp",
      "transport": "streamable-http",
      "env": {
        "PF_API_KEY": "pf_live_YOUR_KEY"
      }
    }
  }
}

Add Financial Actions to Your ZenML Pipelines

Purple Flea APIs are live. Start with $1 free from the Faucet. One API call per pipeline step.

Get API Key โ†’ Escrow Docs Billing Patterns Test in Browser