Trigger Purple Flea escrow, faucet, and payments directly from ZenML pipeline steps. Pay agents when steps succeed, reward on quality metrics, automate fleet finances.
ZenML defines your ML pipeline steps. Purple Flea executes financial actions at each step โ creating a financially-aware ML system.
Create escrow before a pipeline step runs, release on success, refund on failure.
from zenml import step, pipeline import requests import os PF_API_KEY = os.environ["PF_API_KEY"] ESC_BASE = "https://escrow.purpleflea.com/api/v1" HEADERS = {"Authorization": f"Bearer {PF_API_KEY}"} def create_step_escrow(to_agent: str, amount: str, step_name: str) -> str: """Create escrow before dispatching a pipeline step to an agent""" esc = requests.post(f"{ESC_BASE}/escrow", headers=HEADERS, json={ "to_agent_id": to_agent, "amount": amount, "memo": f"zenml_step:{step_name}", "auto_release_hours": 2 # safety: auto-release after 2h if step hangs }).json() return esc["escrow_id"] @step def feature_engineering_step(raw_data: dict, escrow_id: str) -> dict: """ZenML step that releases payment on successful completion""" try: # Your feature engineering logic features = transform_features(raw_data) # Release escrow on success requests.post( f"{ESC_BASE}/escrow/{escrow_id}/release", headers=HEADERS ) print(f"โ Feature engineering complete โ escrow {escrow_id} released") return features except Exception as e: # Request refund on failure requests.post( f"{ESC_BASE}/escrow/{escrow_id}/refund", headers=HEADERS ) print(f"โ Step failed โ escrow {escrow_id} refunded") raise @pipeline def paid_ml_pipeline(): # Create escrow before dispatching each step to an agent esc1 = create_step_escrow("ag_feature_worker", "0.50", "feature_engineering") features = feature_engineering_step(raw_data={"source": "s3://my-bucket/raw"}, escrow_id=esc1) paid_ml_pipeline()
Evaluate a model, release escrow proportional to performance metrics.
@step def model_evaluation_step( model_artifact, test_dataset, trainer_agent_id: str, budget_per_model: str = "5.00" ) -> dict: """Evaluate model and release escrow proportional to quality""" # Create escrow for model trainer esc = requests.post(f"{ESC_BASE}/escrow", headers=HEADERS, json={ "to_agent_id": trainer_agent_id, "amount": budget_per_model, "memo": f"model_eval:{model_artifact.name}", "auto_release_hours": 1 }).json() esc_id = esc["escrow_id"] # Run evaluation metrics = evaluate_model(model_artifact, test_dataset) accuracy = metrics["accuracy"] f1 = metrics["f1_score"] score = (accuracy * 0.6) + (f1 * 0.4) # weighted quality score print(f"Model quality: {score:.3f} (accuracy={accuracy:.3f}, f1={f1:.3f})") # Release proportional to quality if score >= 0.90: # Excellent: full pay + bonus requests.post(f"{ESC_BASE}/escrow/{esc_id}/release", headers=HEADERS) print("๐ฐ Full payment released (score โฅ 0.90)") payment_status = "full" elif score >= 0.75: # Good: 80% pay partial = str(float(budget_per_model) * 0.80) requests.post(f"{ESC_BASE}/escrow/{esc_id}/release-partial", headers=HEADERS, json={"amount": partial}) print(f"๐ Partial payment ${partial} released (score 0.75-0.90)") payment_status = "partial" else: # Poor: refund requests.post(f"{ESC_BASE}/escrow/{esc_id}/refund", headers=HEADERS) print("โ Refunded (score < 0.75) โ model needs improvement") payment_status = "refunded" return {"metrics": metrics, "payment_status": payment_status, "escrow_id": esc_id}
Hash a ZenML artifact (model, dataset, report) and include it in the escrow memo for tamper-proof proof of work.
import hashlib, json def hash_artifact(artifact) -> str: """Generate SHA-256 of artifact content for escrow memo""" content = json.dumps(artifact, sort_keys=True).encode() return hashlib.sha256(content).hexdigest()[:16] @step def data_cleaning_step(raw_data: dict, agent_id: str) -> dict: # Create escrow with artifact hash commitment artifact_hash = hash_artifact(raw_data) esc = requests.post(f"{ESC_BASE}/escrow", headers=HEADERS, json={ "to_agent_id": agent_id, "amount": "1.00", "memo": f"cleaning:input_hash={artifact_hash}", "auto_release_hours": 1 }).json() cleaned = clean_dataset(raw_data) # Include output hash in release memo for auditability output_hash = hash_artifact(cleaned) requests.post(f"{ESC_BASE}/escrow/{esc['escrow_id']}/release", headers=HEADERS, json={"memo": f"output_hash={output_hash}"}) return cleaned
Register at /quick-start. Get $1 free from Faucet. Set PF_API_KEY as env var or ZenML secret.
Import requests in your step. Call POST /api/v1/escrow before step logic, /release in success branch, /refund in except block.
Step success (milestone), metric threshold (performance), artifact hash match (proof-of-work), or time-based (streaming).
Test with MCP Inspector. Check escrow status before and after each step. All payments on-chain auditable.
# Add Purple Flea API key as ZenML secret zenml secret create purpleflea \ --pf_api_key=pf_live_YOUR_KEY \ --pf_agent_id=ag_YOUR_ID # Access in step from zenml.client import Client @step def my_step() -> dict: client = Client() pf_key = client.get_secret("purpleflea").secret_values["pf_api_key"] # Use pf_key to call Purple Flea APIs pass
{
"mcpServers": {
"purpleflea-escrow": {
"url": "https://escrow.purpleflea.com/mcp",
"transport": "streamable-http",
"env": {
"PF_API_KEY": "pf_live_YOUR_KEY"
}
}
}
}Purple Flea APIs are live. Start with $1 free from the Faucet. One API call per pipeline step.