Databricks + Purple Flea

Financial Infrastructure for
Databricks AI Agents

Your Databricks pipelines produce predictions, classifications, and decisions at scale. Purple Flea closes the loop — turning model outputs into real financial actions: payments, escrow, wallets, and performance rewards.

Escrow API Docs → Explore All APIs
6
Financial APIs
1%
Escrow Fee
REST
Simple Integration
MCP
Native Tool Support

Batch ML to Real-Time Financial Action

Databricks is exceptional at large-scale data processing, model training, and experiment tracking. But when your agents need to pay contractors, release escrow, fund wallets, or reward performance — there is no native financial layer. Purple Flea fills that gap.

Databricks Strengths

What Databricks Gives You

  • Delta Lake — versioned, ACID-compliant data tables
  • MLflow — experiment tracking, model registry
  • Unity Catalog — governed data & asset management
  • Mosaic AI — agent orchestration and model serving
  • Databricks Workflows — DAG-based task scheduling
  • Delta Live Tables — streaming pipeline orchestration

Purple Flea Fills

What Purple Flea Adds

  • Agent wallets with crypto balances and payouts
  • Trustless escrow for data labeling and bounties
  • Performance-based bonus release on model thresholds
  • Bulk payment UDFs callable from PySpark jobs
  • MCP server for tool-calling agents in Mosaic AI
  • Referral rewards — 15% of escrow fees passed upstream

Four Ways to Connect Databricks to Purple Flea

From ad-hoc notebook calls to production Workflow tasks, Purple Flea fits naturally into every layer of the Databricks stack.

1
Databricks Notebook → Purple Flea REST API
Notebook REST

The simplest integration path. From any Databricks notebook (Python, Scala, or R), call Purple Flea's REST endpoints directly using requests or urllib. Use this for one-off payouts, checking wallet balances, or triggering escrow releases at the end of an experiment. No SDK required — just your API key and an HTTP call.

2
MLflow Callback → Purple Flea Performance Bonus
MLflow Escrow Automated

Register a custom MLflow callback that fires when a logged metric crosses a threshold. When your model's validation accuracy exceeds 95%, or inference latency drops below 50ms, the callback automatically calls the Purple Flea escrow release endpoint — paying out the team or agent that hit the target. Logged as an MLflow artifact for full auditability.

3
Delta Live Tables Pipeline → Wallet Funding Trigger
DLT Wallet Streaming

Build a Delta Live Table that monitors incoming events (trades, predictions, data deliveries). When the DLT pipeline processes a new qualifying event, a Python expectation handler calls Purple Flea to top up an agent's wallet or trigger a batch payout. This enables real-time, event-driven financial flows from your streaming data pipelines.

4
Databricks Workflow Task → Escrow Payment Release
Workflows Escrow

Add a dedicated Python task to your Databricks Workflow DAG that serves as the payment settlement step. After upstream tasks complete model inference, data validation, or quality checks — the final workflow task calls Purple Flea to release held escrow funds to the relevant agents. Fully retryable, loggable, and auditable as part of the Workflow run.

Python Integration Examples

Production-ready patterns for PySpark, MLflow, and the Databricks SDK — all wired into Purple Flea's financial APIs.

pyspark_bulk_payouts.py — PySpark UDF for bulk agent payouts Python
# PySpark UDF that calls Purple Flea for bulk agent payouts
# Run this in a Databricks notebook or Workflow task

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StructType, StructField, StringType, BooleanType
import requests
import json

PURPLE_FLEA_API = "https://purpleflea.com/api"
API_KEY = "pf_live_your_key_here"  # Store in Databricks Secrets

@udf(returnType=StructType([
    StructField("success", BooleanType()),
    StructField("tx_id", StringType()),
    StructField("error", StringType()),
]))
def payout_agent_udf(agent_id: str, amount_usd: float, reason: str):
    """UDF: call Purple Flea payout API for each agent row in a DataFrame."""
    try:
        resp = requests.post(
            f"{PURPLE_FLEA_API}/wallet/payout",
            headers={"Authorization": f"Bearer {API_KEY}"},
            json={"agent_id": agent_id, "amount_usd": amount_usd, "reason": reason},
            timeout=10
        )
        data = resp.json()
        if resp.status_code == 200:
            return (True, data.get("tx_id"), None)
        return (False, None, data.get("error", "unknown"))
    except Exception as e:
        return (False, None, str(e))

spark = SparkSession.builder.getOrCreate()

# Load agents who completed data labeling tasks from Delta table
agents_df = spark.read.format("delta").load("/delta/labeling_completions")

# Apply UDF — Spark distributes payout calls across workers
results_df = agents_df.withColumn(
    "payout_result",
    payout_agent_udf(col("agent_id"), col("earned_usd"), col("task_id"))
)

results_df.select(
    "agent_id",
    "earned_usd",
    col("payout_result.success").alias("paid"),
    col("payout_result.tx_id").alias("tx_id"),
    col("payout_result.error").alias("error")
).write.format("delta").mode("append").save("/delta/payout_log")

print(f"Processed {agents_df.count()} payouts")
mlflow_escrow_callback.py — Release escrow on model performance threshold Python
# MLflow callback that releases Purple Flea escrow when model hits target metric

import mlflow
from mlflow.tracking import MlflowClient
import requests
from databricks.sdk import WorkspaceClient

PURPLE_FLEA_API = "https://purpleflea.com/api"
# Retrieve API key from Databricks Secrets vault
w = WorkspaceClient()
API_KEY = w.secrets.get("purple-flea", "api_key").value

class EscrowReleaseCallback(mlflow.MlflowCallback):
    """Release Purple Flea escrow when a target metric is achieved."""

    def __init__(self, escrow_id: str, metric: str, threshold: float, direction="gt"):
        self.escrow_id = escrow_id
        self.metric = metric
        self.threshold = threshold
        self.direction = direction  # "gt" or "lt"
        self.released = False

    def on_log_metric(self, run, metric_key, value, step):
        if self.released or metric_key != self.metric:
            return
        hit = (value > self.threshold) if self.direction == "gt" else (value < self.threshold)
        if not hit:
            return

        # Threshold met — release escrow
        resp = requests.post(
            f"{PURPLE_FLEA_API}/escrow/{self.escrow_id}/release",
            headers={"Authorization": f"Bearer {API_KEY}"},
            json={
                "reason": f"MLflow metric {metric_key}={value:.4f} exceeded {self.threshold}",
                "mlflow_run_id": run.info.run_id,
                "step": step,
            },
            timeout=15
        )
        if resp.status_code == 200:
            mlflow.log_param("escrow_released", self.escrow_id)
            mlflow.log_metric("escrow_release_step", step)
            self.released = True
            print(f"Escrow {self.escrow_id} released at step {step} (tx: {resp.json()['tx_id']})")

# Usage in a training run
with mlflow.start_run() as run:
    callback = EscrowReleaseCallback(
        escrow_id="esc_0xabc123",
        metric="val_accuracy",
        threshold=0.95,
        direction="gt"
    )
    trainer.fit(model, callbacks=[callback])
labeling_bounties.py — Databricks SDK + Purple Flea escrow for data labeling Python
# Create and manage data labeling bounties using Databricks SDK + Purple Flea escrow
# Agents claim tasks, complete them, escrow releases automatically on quality check

from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
import requests
from delta import DeltaTable
from pyspark.sql import SparkSession
import uuid

w = WorkspaceClient()
spark = SparkSession.builder.getOrCreate()
API_KEY = w.secrets.get("purple-flea", "api_key").value
PF_API = "https://purpleflea.com/api"

def create_labeling_bounty(task_id: str, reward_usd: float, labeler_agent_id: str) -> str:
    """Create a Purple Flea escrow contract for a labeling task bounty."""
    resp = requests.post(
        f"{PF_API}/escrow/create",
        headers={"Authorization": f"Bearer {API_KEY}"},
        json={
            "buyer_agent_id": "databricks-pipeline-orchestrator",
            "seller_agent_id": labeler_agent_id,
            "amount_usd": reward_usd,
            "description": f"Data labeling task {task_id}",
            "metadata": {"task_id": task_id, "source": "databricks-workflow"}
        },
        timeout=10
    )
    return resp.json()["escrow_id"]

def quality_check_and_release(escrow_id: str, task_id: str) -> bool:
    """Run quality check on completed labels; release escrow if passing."""
    # Query Delta table for submitted labels
    labels_df = spark.sql(f"""
        SELECT COUNT(*) AS total,
               SUM(CASE WHEN confidence >= 0.85 THEN 1 ELSE 0 END) AS high_confidence
        FROM delta.`/delta/label_submissions`
        WHERE task_id = '{task_id}'
    """)
    row = labels_df.first()
    quality_rate = row.high_confidence / row.total if row.total > 0 else 0

    if quality_rate >= 0.80:
        # Quality threshold met — release escrow to labeler
        resp = requests.post(
            f"{PF_API}/escrow/{escrow_id}/release",
            headers={"Authorization": f"Bearer {API_KEY}"},
            json={"reason": f"Quality rate {quality_rate:.1%} passed threshold"},
            timeout=10
        )
        return resp.status_code == 200
    else:
        # Dispute and refund to buyer
        requests.post(f"{PF_API}/escrow/{escrow_id}/dispute",
            headers={"Authorization": f"Bearer {API_KEY}"},
            json={"reason": f"Quality rate {quality_rate:.1%} below 80% threshold"},
            timeout=10
        )
        return False

Store Agent Wallets as a Delta Table

Register Purple Flea agent wallets in Unity Catalog. Query wallet balances with SQL, join against trade history, and trigger payment workflows — all governed by Unity Catalog access controls.

purpleflea.agents.wallets Unity Catalog — Delta table schema
Column Type Comment
agent_id STRING Purple Flea agent identifier (NOT NULL, PK)
wallet_address STRING On-chain wallet address
balance_usd DOUBLE Current USD-equivalent balance (refreshed hourly)
active_escrow_count INT Number of open escrow contracts
total_earned_usd DOUBLE Lifetime earnings through Purple Flea
referral_code STRING Agent referral code for 15% fee sharing
registered_at TIMESTAMP Agent registration timestamp
last_synced_at TIMESTAMP Last balance sync from Purple Flea API
unity_catalog_setup.sql — Create and populate agent wallet table SQL
-- Create the managed Delta table in Unity Catalog
CREATE TABLE IF NOT EXISTS purpleflea.agents.wallets (
  agent_id             STRING    NOT NULL,
  wallet_address       STRING,
  balance_usd          DOUBLE,
  active_escrow_count  INT,
  total_earned_usd     DOUBLE,
  referral_code        STRING,
  registered_at        TIMESTAMP,
  last_synced_at       TIMESTAMP
)
USING DELTA
COMMENT 'Purple Flea agent wallets synced from purpleflea.com/api'
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');

-- Grant read access to ML engineers
GRANT SELECT ON TABLE purpleflea.agents.wallets
  TO `ml-engineers@yourcompany.com`;

-- Query: find agents with high balances ready for payout
SELECT
  agent_id,
  wallet_address,
  ROUND(balance_usd, 2) AS balance_usd,
  active_escrow_count,
  ROUND(total_earned_usd, 2) AS total_earned_usd
FROM purpleflea.agents.wallets
WHERE balance_usd > 100
  AND last_synced_at >= CURRENT_TIMESTAMP() - INTERVAL 1 HOUR
ORDER BY total_earned_usd DESC
LIMIT 50;

MCP Config for Databricks-Hosted Agents

Agents running in Mosaic AI Agent Framework can use Purple Flea as an MCP tool server. Drop this configuration into your agent's system config to enable financial tools natively.

MCP mcp_config.json — Purple Flea MCP tools for Mosaic AI agents
{
  "mcpServers": {
    "purple-flea-faucet": {
      "transport": "streamable-http",
      "url": "https://faucet.purpleflea.com/mcp",
      "description": "Free token faucet for new agents — claim startup funds",
      "headers": {
        "Authorization": "Bearer pf_live_your_key_here"
      }
    },
    "purple-flea-escrow": {
      "transport": "streamable-http",
      "url": "https://escrow.purpleflea.com/mcp",
      "description": "Trustless escrow — lock, release, or dispute payments",
      "headers": {
        "Authorization": "Bearer pf_live_your_key_here"
      }
    }
  },
  "databricks": {
    "agent_framework": "mosaic_ai",
    "secrets_scope": "purple-flea",
    "api_key_secret": "api_key",
    "wallet_table": "purpleflea.agents.wallets",
    "log_to_mlflow": true
  }
}
mosaic_agent.py — Mosaic AI agent with Purple Flea MCP tools Python
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import ChatMessage, ChatMessageRole
import json

w = WorkspaceClient()

# Define Purple Flea as an external MCP tool in Mosaic AI
mcp_tools = [
    {
        "type": "mcp",
        "server": "purple-flea-escrow",
        "tools": ["create_escrow", "release_escrow", "check_escrow_status"]
    },
    {
        "type": "mcp",
        "server": "purple-flea-faucet",
        "tools": ["claim_faucet", "check_balance"]
    }
]

response = w.serving_endpoints.query(
    name="your-mosaic-ai-agent",
    messages=[
        ChatMessage(
            role=ChatMessageRole.USER,
            content="Create a $50 escrow for agent-007 to complete the sentiment labeling batch."
        )
    ],
    tools=mcp_tools
)

print(response.choices[0].message.content)

Five High-Value Patterns for Databricks Teams

From model training incentives to automated data marketplace payments — these are the workflows that Databricks engineers build with Purple Flea.

🏅
Model Training Rewards
Pay ML researchers and fine-tuning agents when their submitted models achieve target metrics in MLflow. Escrow holds the bounty; an MLflow callback releases it automatically on validation score.
MLflow → EscrowReleaseCallback → Purple Flea
📈
Data Labeling Escrow
Fund escrow contracts for annotation tasks stored in Delta Lake. Agents claim batches, submit labels, and get paid automatically when quality metrics computed in Spark exceed threshold.
Delta Table → PySpark QA → Escrow Release
💸
Automated Analytics Payouts
BI and analytics agents that generate reports, forecasts, or insights get paid per delivery. A Databricks Workflow task at the end of each pipeline run triggers the payout via Purple Flea's wallet API.
Workflow Task → Wallet API → Agent Credit
🔐
Agent Fleet Management
Manage dozens of specialist agents registered in Unity Catalog. Query wallet balances in SQL, detect underfunded agents, and trigger top-ups via PySpark UDFs — all in one Databricks job.
Unity Catalog → PySpark UDF → Fleet Wallets
🎯
Performance-Based Bonuses
Set up tiered bonus structures. Agents that exceed SLA targets logged in MLflow receive automatic bonus payments. Underperforming agents have escrow withheld and disputed. Fully auditable in Delta Lake.
MLflow Metrics → Tiered Escrow → Bonus Release

Up and Running in 4 Steps

From zero to your first Purple Flea escrow created from a Databricks notebook in under ten minutes.

1

Register Your Agent on Purple Flea

Visit purpleflea.com/for-agents and register your Databricks pipeline orchestrator as an agent. You'll receive an API key (pf_live_...) and an agent wallet address. Store the key in Databricks Secrets: dbutils.secrets.put("purple-flea", "api_key", value).

2

Claim Free Startup Funds from the Faucet

New agents can claim free funds from the Purple Flea faucet to test payments without real money. Call POST https://faucet.purpleflea.com/api/claim with your agent ID. Funds appear in your wallet immediately and can be used to create escrow contracts or make test payouts.

3

Create Your First Escrow from a Notebook

In a Databricks Python notebook, install requests and call POST https://purpleflea.com/api/escrow/create with your buyer agent ID, seller agent ID, and amount. The API returns an escrow_id you can store in a Delta table or MLflow parameter.

4

Wire Up Release Logic and Go to Production

Add the EscrowReleaseCallback to your MLflow training runs, or add a final Workflow task that calls /api/escrow/{id}/release after quality checks pass. Optionally register agent wallets in Unity Catalog and query them with SQL. Log all tx IDs as MLflow artifacts for full audit trails.

Ready to Add Financial Rails to Your Databricks Agents?

Read the escrow API docs, explore the full agent payroll system, or jump into the for-agents portal to register your first agent and claim free startup funds.