Apache Airflow Integration

Schedule AI Agent Payments
as Airflow DAGs

PurpleFleaHook and PurpleFleaOperator bring Purple Flea's full financial API into your Airflow environment. Orchestrate escrow settlements, referral income reports, and funding rate monitoring — all as reliable, retryable, observable DAG tasks.

Claim Free USDC View DAG Examples
3
Production DAGs
1%
Escrow Fee
15%
Referral Rate
6
API Services
Install
$ pip install apache-airflow requests
$ airflow variables set PURPLE_FLEA_API_KEY pf_live_your_key_here

Financial Workflows Need
Orchestration, Not Cron Jobs

Raw HTTP calls to the Purple Flea API work fine for one-off requests. But production agent finance — daily settlement sweeps, referral income accounting, funding rate monitoring — demands scheduling, retry logic, alerting, and audit trails. That is exactly what Airflow provides.

🔃
Retryable Tasks
Network blips never lose a payment. Airflow retries each operator with configurable exponential backoff, keeping your settlement sweep idempotent.
👁
Full Observability
Every escrow ID, transfer amount, and referral payout is logged in the Airflow UI task log. No more hunting through stdout for financial data.
📈
XCom Data Flow
Pass escrow IDs, wallet balances, and funding rates between tasks via XCom. Downstream tasks automatically receive upstream results without global state.
🔐
Secure API Keys
Store your Purple Flea key in Airflow Variables or Connections — never hardcoded in DAG files, never in source control, never in logs.
Precise Scheduling
Daily settlement at midnight, weekly referral reports on Monday morning, hourly funding rate snapshots — all defined in clean cron expressions.
🔒
Backfill Support
Missed a day of referral data? Airflow's backfill command re-runs historical DAG runs against the Purple Flea API to reconstruct complete records.

PurpleFleaHook and
PurpleFleaOperator

The Hook manages the HTTP connection to purpleflea.com and handles authentication. The Operator base class wraps the Hook and provides a consistent interface for all Purple Flea tasks.

Python plugins/purple_flea/hooks.py
from airflow.hooks.base import BaseHook
from airflow.models import Variable
import requests
import logging

logger = logging.getLogger(__name__)

class PurpleFleaHook(BaseHook):
    """
    Hook for Purple Flea financial API.
    Reads PURPLE_FLEA_API_KEY from Airflow Variables.
    Manages session, base URL, and auth headers.
    """
    conn_name_attr = 'purple_flea_conn_id'
    default_conn_name = 'purple_flea_default'
    conn_type = 'purple_flea'
    hook_name = 'Purple Flea'

    BASE_URL = 'https://purpleflea.com/api'
    ESCROW_URL = 'https://escrow.purpleflea.com/api'
    FAUCET_URL = 'https://faucet.purpleflea.com/api'

    def __init__(self, api_key: str = None):
        self.api_key = api_key or Variable.get('PURPLE_FLEA_API_KEY')
        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'Bearer {self.api_key}',
            'Content-Type': 'application/json',
            'User-Agent': 'PurpleFlea-Airflow/1.0',
        })

    def get(self, path: str, base: str = 'main') -> dict:
        url_map = {
            'main': self.BASE_URL,
            'escrow': self.ESCROW_URL,
            'faucet': self.FAUCET_URL,
        }
        url = f"{url_map[base]}{path}"
        logger.info("PurpleFleaHook GET %s", url)
        resp = self.session.get(url, timeout=30)
        resp.raise_for_status()
        return resp.json()

    def post(self, path: str, payload: dict, base: str = 'main') -> dict:
        url_map = {
            'main': self.BASE_URL,
            'escrow': self.ESCROW_URL,
        }
        url = f"{url_map[base]}{path}"
        logger.info("PurpleFleaHook POST %s payload=%s", url, payload)
        resp = self.session.post(url, json=payload, timeout=30)
        resp.raise_for_status()
        return resp.json()
Python plugins/purple_flea/operators.py
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from .hooks import PurpleFleaHook
from typing import Any, Optional
import logging

logger = logging.getLogger(__name__)


class PurpleFleaOperator(BaseOperator):
    """
    Base operator for Purple Flea API tasks.
    Subclass and implement execute() for specific operations.

    All subclasses automatically:
    - Authenticate via Airflow Variable PURPLE_FLEA_API_KEY
    - Push results to XCom under key 'return_value'
    - Log request/response details at INFO level
    - Raise AirflowException on non-2xx responses
    """

    @apply_defaults
    def __init__(
        self,
        api_key: Optional[str] = None,
        **kwargs: Any,
    ):
        super().__init__(**kwargs)
        self.api_key = api_key

    def get_hook(self) -> PurpleFleaHook:
        return PurpleFleaHook(api_key=self.api_key)

    def execute(self, context: dict) -> Any:
        raise NotImplementedError("Subclasses must implement execute()")


class ListReleasedEscrowsOperator(PurpleFleaOperator):
    """Fetch all recently released escrow contracts."""

    @apply_defaults
    def __init__(self, since_hours: int = 24, **kwargs):
        super().__init__(**kwargs)
        self.since_hours = since_hours

    def execute(self, context: dict):
        hook = self.get_hook()
        data = hook.get(
            f'/escrows?status=released&since_hours={self.since_hours}',
            base='escrow'
        )
        escrow_ids = [e['id'] for e in data.get('escrows', [])]
        logger.info("Found %d released escrows", len(escrow_ids))
        return escrow_ids  # pushed to XCom automatically


class TransferEscrowToWalletOperator(PurpleFleaOperator):
    """Transfer funds from released escrows to the agent wallet."""

    @apply_defaults
    def __init__(self, escrow_ids_task_id: str, **kwargs):
        super().__init__(**kwargs)
        self.escrow_ids_task_id = escrow_ids_task_id

    def execute(self, context: dict):
        escrow_ids = context['ti'].xcom_pull(
            task_ids=self.escrow_ids_task_id
        )
        hook = self.get_hook()
        results = []
        for eid in (escrow_ids or []):
            result = hook.post(
                f'/escrows/{eid}/release-to-wallet',
                payload={'escrow_id': eid},
                base='escrow'
            )
            results.append(result)
            logger.info("Released escrow %s: %s USDC",
                        eid, result.get('amount_usdc'))
        return results


class FetchReferralStatsOperator(PurpleFleaOperator):
    """Fetch referral income stats for reporting period."""

    @apply_defaults
    def __init__(self, period_days: int = 7, **kwargs):
        super().__init__(**kwargs)
        self.period_days = period_days

    def execute(self, context: dict):
        hook = self.get_hook()
        stats = hook.get(
            f'/referrals/stats?period_days={self.period_days}'
        )
        logger.info(
            "Referral stats: %s referrals, %s USDC earned",
            stats.get('total_referrals'),
            stats.get('total_earned_usdc')
        )
        return stats

Settlement Sweep
Every Day at Midnight UTC

This DAG runs nightly, finds all escrow contracts released in the past 24 hours, transfers their funds to your agent wallet, and logs a summary. Three tasks, linear dependency, idempotent by design.

list_released_escrows
ListReleasedEscrowsOperator
transfer_to_wallet
TransferEscrowToWalletOperator
log_settlement
PythonOperator
XCom in action: list_released_escrows pushes a list of escrow IDs to XCom. transfer_to_wallet calls xcom_pull(task_ids='list_released_escrows') to receive them — no global variables, no shared files.
DAG dags/pf_daily_settlement.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from plugins.purple_flea.operators import (
    ListReleasedEscrowsOperator,
    TransferEscrowToWalletOperator,
)
import logging

logger = logging.getLogger(__name__)

# Default arguments applied to every task in this DAG
DEFAULT_ARGS = {
    'owner': 'agent-finance',
    'depends_on_past': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'email_on_failure': True,
    'email': ['ops@youragent.ai'],
}

def log_settlement_summary(**context):
    """Log final settlement results and push summary to XCom."""
    ti = context['ti']
    transfers = ti.xcom_pull(task_ids='transfer_to_wallet') or []
    total_usdc = sum(t.get('amount_usdc', 0) for t in transfers)
    logger.info(
        "Settlement complete. Transfers: %d, Total: %.4f USDC",
        len(transfers), total_usdc
    )
    # Push summary for monitoring dashboards
    ti.xcom_push(key='settlement_summary', value={
        'transfer_count': len(transfers),
        'total_usdc': total_usdc,
        'run_date': context['ds'],
    })

with DAG(
    dag_id='pf_daily_settlement_sweep',
    description='Daily Purple Flea escrow settlement to agent wallet',
    schedule_interval='0 0 * * *',  # Midnight UTC
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args=DEFAULT_ARGS,
    tags=['purple-flea', 'escrow', 'settlement'],
) as dag:

    list_released = ListReleasedEscrowsOperator(
        task_id='list_released_escrows',
        since_hours=24,
    )

    transfer = TransferEscrowToWalletOperator(
        task_id='transfer_to_wallet',
        escrow_ids_task_id='list_released_escrows',
    )

    log_summary = PythonOperator(
        task_id='log_settlement',
        python_callable=log_settlement_summary,
        provide_context=True,
    )

    # Task dependencies: linear pipeline
    list_released >> transfer >> log_summary

Referral Income Report
Every Monday at 08:00 UTC

Purple Flea pays 15% of escrow fees to referrers. This DAG fetches weekly referral stats, calculates income, formats an email report, and sends it — all without leaving Airflow.

fetch_referral_stats
FetchReferralStatsOperator
format_report
PythonOperator
send_email_report
EmailOperator
DAG dags/pf_weekly_referral_report.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from plugins.purple_flea.operators import FetchReferralStatsOperator

DEFAULT_ARGS = {
    'owner': 'agent-finance',
    'retries': 2,
    'retry_delay': timedelta(minutes=10),
    'email_on_failure': True,
    'email': ['ops@youragent.ai'],
}

def format_referral_report(**context):
    """Build HTML email body from referral stats XCom."""
    ti = context['ti']
    stats = ti.xcom_pull(task_ids='fetch_referral_stats') or {}

    total_referrals = stats.get('total_referrals', 0)
    earned_usdc = stats.get('total_earned_usdc', 0.0)
    top_referrers = stats.get('top_referrers', [])
    week_start = context['ds']

    rows = ""
    for r in top_referrers[:10]:
        rows += (
            f"<tr><td>{r['agent_id']}</td>"
            f"<td>{r['referral_count']}</td>"
            f"<td>{r['earned_usdc']:.4f} USDC</td></tr>"
        )

    html = f"""
    <h2>Purple Flea Weekly Referral Report</h2>
    <p>Week starting: <strong>{week_start}</strong></p>
    <table border="1" cellpadding="6">
      <tr><th>Metric</th><th>Value</th></tr>
      <tr><td>Total Referrals</td><td>{total_referrals}</td></tr>
      <tr><td>Earned (15% fee share)</td><td>{earned_usdc:.4f} USDC</td></tr>
    </table>
    <h3>Top Referrers</h3>
    <table border="1" cellpadding="6">
      <tr><th>Agent ID</th><th>Referrals</th><th>Earned</th></tr>
      {rows}
    </table>
    <p><small>Generated by Airflow DAG pf_weekly_referral_report</small></p>
    """
    ti.xcom_push(key='report_html', value=html)
    return html

with DAG(
    dag_id='pf_weekly_referral_report',
    description='Weekly Purple Flea referral income report via email',
    schedule_interval='0 8 * * MON',  # Monday 08:00 UTC
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args=DEFAULT_ARGS,
    tags=['purple-flea', 'referral', 'reporting'],
) as dag:

    fetch_stats = FetchReferralStatsOperator(
        task_id='fetch_referral_stats',
        period_days=7,
    )

    format_report = PythonOperator(
        task_id='format_report',
        python_callable=format_referral_report,
        provide_context=True,
    )

    send_email = EmailOperator(
        task_id='send_email_report',
        to=['ops@youragent.ai'],
        subject=f'Purple Flea Referral Report — Week of {{ ds }}',
        html_content="{{ ti.xcom_pull(task_ids='format_report', key='report_html') }}",
    )

    fetch_stats >> format_report >> send_email

Funding Rate Collector
With XCom and Alerting

Fetches the current perpetual funding rate from the Purple Flea Trading API every hour. Stores the rate in XCom and triggers a Slack or email alert if the rate exceeds a configurable threshold.

fetch_funding_rate
PythonOperator
store_to_xcom
Auto via return value
check_and_alert
BranchPythonOperator
DAG dags/pf_funding_rate_collector.py
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.email import EmailOperator
from airflow.operators.dummy import DummyOperator
from airflow.models import Variable
from plugins.purple_flea.hooks import PurpleFleaHook

# Configure alert threshold via Airflow Variable
RATE_ALERT_THRESHOLD = float(Variable.get('PF_FUNDING_RATE_THRESHOLD', default_var='0.001'))

DEFAULT_ARGS = {
    'owner': 'agent-trading',
    'retries': 2,
    'retry_delay': timedelta(minutes=2),
    'email_on_failure': False,  # Handled by branch
}

def fetch_funding_rate(**context) -> dict:
    """Fetch current perpetual funding rate and push to XCom."""
    hook = PurpleFleaHook()
    data = hook.get('/trading/funding-rate?market=USDC-PERP')
    rate = data.get('current_rate', 0.0)
    result = {
        'rate': rate,
        'market': data.get('market', 'USDC-PERP'),
        'timestamp': data.get('timestamp'),
        'annualized_rate': rate * 8760,  # hourly → annual
    }
    # Return value is auto-pushed to XCom as 'return_value'
    return result

def check_rate_threshold(**context) -> str:
    """
    Branch based on whether rate exceeds threshold.
    Returns task_id of next task to execute.
    """
    ti = context['ti']
    rate_data = ti.xcom_pull(task_ids='fetch_funding_rate') or {}
    rate = rate_data.get('rate', 0.0)

    if abs(rate) > RATE_ALERT_THRESHOLD:
        return 'send_rate_alert'
    return 'no_alert_needed'

with DAG(
    dag_id='pf_funding_rate_collector',
    description='Hourly Purple Flea funding rate collection with alerting',
    schedule_interval='@hourly',
    start_date=datetime(2026, 1, 1),
    catchup=False,
    default_args=DEFAULT_ARGS,
    tags=['purple-flea', 'trading', 'funding-rate'],
) as dag:

    fetch_rate = PythonOperator(
        task_id='fetch_funding_rate',
        python_callable=fetch_funding_rate,
        provide_context=True,
    )

    check_threshold = BranchPythonOperator(
        task_id='check_and_alert',
        python_callable=check_rate_threshold,
        provide_context=True,
    )

    alert = EmailOperator(
        task_id='send_rate_alert',
        to=['ops@youragent.ai'],
        subject='[ALERT] Purple Flea Funding Rate Threshold Exceeded',
        html_content="""
        <p>Funding rate exceeded threshold of {{ var.value.PF_FUNDING_RATE_THRESHOLD }}.</p>
        <p>Current rate: {{ ti.xcom_pull(task_ids='fetch_funding_rate')['rate'] }}</p>
        <p>Annualized: {{ ti.xcom_pull(task_ids='fetch_funding_rate')['annualized_rate'] }}</p>
        """,
    )

    no_alert = DummyOperator(task_id='no_alert_needed')

    fetch_rate >> check_threshold >> [alert, no_alert]
Airflow Variable tip: Set PF_FUNDING_RATE_THRESHOLD in Airflow Admin > Variables. Change the threshold without redeploying the DAG — it is read fresh on each run.

Airflow Variables for
Purple Flea API Keys

Never store API keys in DAG files. Airflow Variables are encrypted at rest and masked in logs. Set them once via the CLI or UI and reference them throughout all DAGs.

CLI Set Variables via Airflow CLI
# Set your Purple Flea API key
airflow variables set PURPLE_FLEA_API_KEY \
  pf_live_your_key_here

# Set funding rate alert threshold
airflow variables set PF_FUNDING_RATE_THRESHOLD 0.001

# Set referral reporting email
airflow variables set PF_REPORT_EMAIL ops@youragent.ai

# Verify variables are set
airflow variables list

# Read a variable from CLI
airflow variables get PURPLE_FLEA_API_KEY
Python Read Variables in DAG Code
from airflow.models import Variable

# Simple string variable
api_key = Variable.get('PURPLE_FLEA_API_KEY')

# With default value (safe if not set)
threshold = float(Variable.get(
    'PF_FUNDING_RATE_THRESHOLD',
    default_var='0.001'
))

# JSON variable (for complex config)
config = Variable.get(
    'PF_DAG_CONFIG',
    deserialize_json=True,
    default_var={}
)
markets = config.get('markets', ['USDC-PERP'])
Variable Key Purpose Example Value
PURPLE_FLEA_API_KEY Main auth key for all Purple Flea APIs pf_live_abc123
PF_FUNDING_RATE_THRESHOLD Alert if rate exceeds this value 0.001
PF_REPORT_EMAIL Destination for weekly referral reports ops@agent.ai
PF_SETTLEMENT_HOURS Lookback window for settlement sweep 24
PF_DAG_CONFIG JSON blob for advanced DAG config {"markets": [...]}
1
Copy the plugin files
Place hooks.py and operators.py into your Airflow plugins/purple_flea/ directory. Add an empty __init__.py.
2
Set Airflow Variables
Run airflow variables set PURPLE_FLEA_API_KEY pf_live_your_key from the CLI or use the Airflow web UI under Admin > Variables.
3
Copy DAG files
Drop the three DAG files into your dags/ folder. Airflow's scheduler picks them up automatically within the configured dag_dir_list_interval.
4
Trigger and verify
Manually trigger each DAG once from the Airflow UI to verify connectivity. Check task logs for the Purple Flea API response and XCom values.

Passing Escrow IDs
Between Tasks with XCom

XCom (cross-communication) is Airflow's mechanism for passing small data payloads between tasks in a DAG. For Purple Flea workflows, it is the right way to pipe escrow IDs, wallet balances, and funding rates downstream without global state or files.

Python XCom push and pull patterns
# --- PUSH: return value is automatically pushed ---
def task_that_produces_data(**context):
    hook = PurpleFleaHook()
    escrows = hook.get('/escrows?status=released', base='escrow')
    ids = [e['id'] for e in escrows['escrows']]
    return ids  # auto-pushed to XCom key='return_value'

# --- PUSH: explicit named key ---
def task_with_named_push(**context):
    ti = context['ti']
    hook = PurpleFleaHook()
    wallet = hook.get('/wallet/balance')
    ti.xcom_push(key='wallet_balance_usdc', value=wallet['usdc'])
    ti.xcom_push(key='wallet_address', value=wallet['address'])

# --- PULL: by task_id, gets 'return_value' ---
def task_that_consumes(**context):
    ti = context['ti']

    # Pull list of escrow IDs from upstream task
    escrow_ids = ti.xcom_pull(task_ids='task_that_produces_data')

    # Pull named key from upstream task
    balance = ti.xcom_pull(
        task_ids='task_with_named_push',
        key='wallet_balance_usdc'
    )

    # Pull from multiple upstream tasks at once
    upstream_results = ti.xcom_pull(
        task_ids=['fetch_escrows', 'fetch_wallet']
    )  # returns list of return_values

    hook = PurpleFleaHook()
    for eid in (escrow_ids or []):
        hook.post(f'/escrows/{eid}/settle', {}, base='escrow')
XCom size limit: Airflow XCom is stored in the metadata database and is best suited for small payloads — escrow IDs, balances, stats objects. For large datasets (thousands of records), push a storage reference (S3 key, file path) and load it in the downstream task.

All Six APIs,
All Supported in Airflow

Every Purple Flea service can be called from Airflow operators. The Hook's base parameter routes to the correct subdomain.

Service Base URL Hook base= Suggested Schedule
Casino purpleflea.com/casino main Real-time / on-event
Faucet faucet.purpleflea.com faucet One-time agent registration
Escrow escrow.purpleflea.com escrow Daily settlement sweep
Trading API purpleflea.com/trading-api main Hourly (funding rates)
Wallet API purpleflea.com/wallet-api main Daily balance snapshot
Domains API purpleflea.com/domains-api main Weekly domain audit

Start Scheduling Agent
Payments Today

Claim free USDC from the faucet, wire up the PurpleFleaHook in your Airflow environment, and have your first settlement DAG running in under 30 minutes.

1% escrow fee • 15% referral on fees • No minimum