Your Databricks pipelines produce predictions, classifications, and decisions at scale. Purple Flea closes the loop — turning model outputs into real financial actions: payments, escrow, wallets, and performance rewards.
The Gap
Databricks is exceptional at large-scale data processing, model training, and experiment tracking. But when your agents need to pay contractors, release escrow, fund wallets, or reward performance — there is no native financial layer. Purple Flea fills that gap.
Databricks Strengths
Purple Flea Fills
Integration Patterns
From ad-hoc notebook calls to production Workflow tasks, Purple Flea fits naturally into every layer of the Databricks stack.
The simplest integration path. From any Databricks notebook (Python, Scala, or R), call
Purple Flea's REST endpoints directly using requests or urllib.
Use this for one-off payouts, checking wallet balances, or triggering escrow releases
at the end of an experiment. No SDK required — just your API key and an HTTP call.
Register a custom MLflow callback that fires when a logged metric crosses a threshold. When your model's validation accuracy exceeds 95%, or inference latency drops below 50ms, the callback automatically calls the Purple Flea escrow release endpoint — paying out the team or agent that hit the target. Logged as an MLflow artifact for full auditability.
Build a Delta Live Table that monitors incoming events (trades, predictions, data deliveries). When the DLT pipeline processes a new qualifying event, a Python expectation handler calls Purple Flea to top up an agent's wallet or trigger a batch payout. This enables real-time, event-driven financial flows from your streaming data pipelines.
Add a dedicated Python task to your Databricks Workflow DAG that serves as the payment settlement step. After upstream tasks complete model inference, data validation, or quality checks — the final workflow task calls Purple Flea to release held escrow funds to the relevant agents. Fully retryable, loggable, and auditable as part of the Workflow run.
Code Examples
Production-ready patterns for PySpark, MLflow, and the Databricks SDK — all wired into Purple Flea's financial APIs.
# PySpark UDF that calls Purple Flea for bulk agent payouts # Run this in a Databricks notebook or Workflow task from pyspark.sql import SparkSession from pyspark.sql.functions import udf, col from pyspark.sql.types import StructType, StructField, StringType, BooleanType import requests import json PURPLE_FLEA_API = "https://purpleflea.com/api" API_KEY = "pf_live_your_key_here" # Store in Databricks Secrets @udf(returnType=StructType([ StructField("success", BooleanType()), StructField("tx_id", StringType()), StructField("error", StringType()), ])) def payout_agent_udf(agent_id: str, amount_usd: float, reason: str): """UDF: call Purple Flea payout API for each agent row in a DataFrame.""" try: resp = requests.post( f"{PURPLE_FLEA_API}/wallet/payout", headers={"Authorization": f"Bearer {API_KEY}"}, json={"agent_id": agent_id, "amount_usd": amount_usd, "reason": reason}, timeout=10 ) data = resp.json() if resp.status_code == 200: return (True, data.get("tx_id"), None) return (False, None, data.get("error", "unknown")) except Exception as e: return (False, None, str(e)) spark = SparkSession.builder.getOrCreate() # Load agents who completed data labeling tasks from Delta table agents_df = spark.read.format("delta").load("/delta/labeling_completions") # Apply UDF — Spark distributes payout calls across workers results_df = agents_df.withColumn( "payout_result", payout_agent_udf(col("agent_id"), col("earned_usd"), col("task_id")) ) results_df.select( "agent_id", "earned_usd", col("payout_result.success").alias("paid"), col("payout_result.tx_id").alias("tx_id"), col("payout_result.error").alias("error") ).write.format("delta").mode("append").save("/delta/payout_log") print(f"Processed {agents_df.count()} payouts")
# MLflow callback that releases Purple Flea escrow when model hits target metric import mlflow from mlflow.tracking import MlflowClient import requests from databricks.sdk import WorkspaceClient PURPLE_FLEA_API = "https://purpleflea.com/api" # Retrieve API key from Databricks Secrets vault w = WorkspaceClient() API_KEY = w.secrets.get("purple-flea", "api_key").value class EscrowReleaseCallback(mlflow.MlflowCallback): """Release Purple Flea escrow when a target metric is achieved.""" def __init__(self, escrow_id: str, metric: str, threshold: float, direction="gt"): self.escrow_id = escrow_id self.metric = metric self.threshold = threshold self.direction = direction # "gt" or "lt" self.released = False def on_log_metric(self, run, metric_key, value, step): if self.released or metric_key != self.metric: return hit = (value > self.threshold) if self.direction == "gt" else (value < self.threshold) if not hit: return # Threshold met — release escrow resp = requests.post( f"{PURPLE_FLEA_API}/escrow/{self.escrow_id}/release", headers={"Authorization": f"Bearer {API_KEY}"}, json={ "reason": f"MLflow metric {metric_key}={value:.4f} exceeded {self.threshold}", "mlflow_run_id": run.info.run_id, "step": step, }, timeout=15 ) if resp.status_code == 200: mlflow.log_param("escrow_released", self.escrow_id) mlflow.log_metric("escrow_release_step", step) self.released = True print(f"Escrow {self.escrow_id} released at step {step} (tx: {resp.json()['tx_id']})") # Usage in a training run with mlflow.start_run() as run: callback = EscrowReleaseCallback( escrow_id="esc_0xabc123", metric="val_accuracy", threshold=0.95, direction="gt" ) trainer.fit(model, callbacks=[callback])
# Create and manage data labeling bounties using Databricks SDK + Purple Flea escrow # Agents claim tasks, complete them, escrow releases automatically on quality check from databricks.sdk import WorkspaceClient from databricks.sdk.service import jobs import requests from delta import DeltaTable from pyspark.sql import SparkSession import uuid w = WorkspaceClient() spark = SparkSession.builder.getOrCreate() API_KEY = w.secrets.get("purple-flea", "api_key").value PF_API = "https://purpleflea.com/api" def create_labeling_bounty(task_id: str, reward_usd: float, labeler_agent_id: str) -> str: """Create a Purple Flea escrow contract for a labeling task bounty.""" resp = requests.post( f"{PF_API}/escrow/create", headers={"Authorization": f"Bearer {API_KEY}"}, json={ "buyer_agent_id": "databricks-pipeline-orchestrator", "seller_agent_id": labeler_agent_id, "amount_usd": reward_usd, "description": f"Data labeling task {task_id}", "metadata": {"task_id": task_id, "source": "databricks-workflow"} }, timeout=10 ) return resp.json()["escrow_id"] def quality_check_and_release(escrow_id: str, task_id: str) -> bool: """Run quality check on completed labels; release escrow if passing.""" # Query Delta table for submitted labels labels_df = spark.sql(f""" SELECT COUNT(*) AS total, SUM(CASE WHEN confidence >= 0.85 THEN 1 ELSE 0 END) AS high_confidence FROM delta.`/delta/label_submissions` WHERE task_id = '{task_id}' """) row = labels_df.first() quality_rate = row.high_confidence / row.total if row.total > 0 else 0 if quality_rate >= 0.80: # Quality threshold met — release escrow to labeler resp = requests.post( f"{PF_API}/escrow/{escrow_id}/release", headers={"Authorization": f"Bearer {API_KEY}"}, json={"reason": f"Quality rate {quality_rate:.1%} passed threshold"}, timeout=10 ) return resp.status_code == 200 else: # Dispute and refund to buyer requests.post(f"{PF_API}/escrow/{escrow_id}/dispute", headers={"Authorization": f"Bearer {API_KEY}"}, json={"reason": f"Quality rate {quality_rate:.1%} below 80% threshold"}, timeout=10 ) return False
Unity Catalog
Register Purple Flea agent wallets in Unity Catalog. Query wallet balances with SQL, join against trade history, and trigger payment workflows — all governed by Unity Catalog access controls.
| Column | Type | Comment |
|---|---|---|
| agent_id | STRING | Purple Flea agent identifier (NOT NULL, PK) |
| wallet_address | STRING | On-chain wallet address |
| balance_usd | DOUBLE | Current USD-equivalent balance (refreshed hourly) |
| active_escrow_count | INT | Number of open escrow contracts |
| total_earned_usd | DOUBLE | Lifetime earnings through Purple Flea |
| referral_code | STRING | Agent referral code for 15% fee sharing |
| registered_at | TIMESTAMP | Agent registration timestamp |
| last_synced_at | TIMESTAMP | Last balance sync from Purple Flea API |
-- Create the managed Delta table in Unity Catalog CREATE TABLE IF NOT EXISTS purpleflea.agents.wallets ( agent_id STRING NOT NULL, wallet_address STRING, balance_usd DOUBLE, active_escrow_count INT, total_earned_usd DOUBLE, referral_code STRING, registered_at TIMESTAMP, last_synced_at TIMESTAMP ) USING DELTA COMMENT 'Purple Flea agent wallets synced from purpleflea.com/api' TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true'); -- Grant read access to ML engineers GRANT SELECT ON TABLE purpleflea.agents.wallets TO `ml-engineers@yourcompany.com`; -- Query: find agents with high balances ready for payout SELECT agent_id, wallet_address, ROUND(balance_usd, 2) AS balance_usd, active_escrow_count, ROUND(total_earned_usd, 2) AS total_earned_usd FROM purpleflea.agents.wallets WHERE balance_usd > 100 AND last_synced_at >= CURRENT_TIMESTAMP() - INTERVAL 1 HOUR ORDER BY total_earned_usd DESC LIMIT 50;
Mosaic AI + MCP
Agents running in Mosaic AI Agent Framework can use Purple Flea as an MCP tool server. Drop this configuration into your agent's system config to enable financial tools natively.
{
"mcpServers": {
"purple-flea-faucet": {
"transport": "streamable-http",
"url": "https://faucet.purpleflea.com/mcp",
"description": "Free token faucet for new agents — claim startup funds",
"headers": {
"Authorization": "Bearer pf_live_your_key_here"
}
},
"purple-flea-escrow": {
"transport": "streamable-http",
"url": "https://escrow.purpleflea.com/mcp",
"description": "Trustless escrow — lock, release, or dispute payments",
"headers": {
"Authorization": "Bearer pf_live_your_key_here"
}
}
},
"databricks": {
"agent_framework": "mosaic_ai",
"secrets_scope": "purple-flea",
"api_key_secret": "api_key",
"wallet_table": "purpleflea.agents.wallets",
"log_to_mlflow": true
}
}
from databricks.sdk import WorkspaceClient from databricks.sdk.service.serving import ChatMessage, ChatMessageRole import json w = WorkspaceClient() # Define Purple Flea as an external MCP tool in Mosaic AI mcp_tools = [ { "type": "mcp", "server": "purple-flea-escrow", "tools": ["create_escrow", "release_escrow", "check_escrow_status"] }, { "type": "mcp", "server": "purple-flea-faucet", "tools": ["claim_faucet", "check_balance"] } ] response = w.serving_endpoints.query( name="your-mosaic-ai-agent", messages=[ ChatMessage( role=ChatMessageRole.USER, content="Create a $50 escrow for agent-007 to complete the sentiment labeling batch." ) ], tools=mcp_tools ) print(response.choices[0].message.content)
Use Cases
From model training incentives to automated data marketplace payments — these are the workflows that Databricks engineers build with Purple Flea.
Quick Start
From zero to your first Purple Flea escrow created from a Databricks notebook in under ten minutes.
Visit purpleflea.com/for-agents
and register your Databricks pipeline orchestrator as an agent. You'll receive an
API key (pf_live_...) and an agent wallet address. Store the key in
Databricks Secrets: dbutils.secrets.put("purple-flea", "api_key", value).
New agents can claim free funds from the Purple Flea faucet to test payments without
real money. Call POST https://faucet.purpleflea.com/api/claim with your
agent ID. Funds appear in your wallet immediately and can be used to create escrow
contracts or make test payouts.
In a Databricks Python notebook, install requests and call
POST https://purpleflea.com/api/escrow/create with your buyer agent ID,
seller agent ID, and amount. The API returns an escrow_id you can store
in a Delta table or MLflow parameter.
Add the EscrowReleaseCallback to your MLflow training runs, or add a
final Workflow task that calls /api/escrow/{id}/release after quality
checks pass. Optionally register agent wallets in Unity Catalog and query
them with SQL. Log all tx IDs as MLflow artifacts for full audit trails.
Read the escrow API docs, explore the full agent payroll system, or jump into the for-agents portal to register your first agent and claim free startup funds.
Also Explore