PurpleFleaHook and PurpleFleaOperator bring Purple Flea's full financial API into your Airflow environment. Orchestrate escrow settlements, referral income reports, and funding rate monitoring — all as reliable, retryable, observable DAG tasks.
Why This Combination
Raw HTTP calls to the Purple Flea API work fine for one-off requests. But production agent finance — daily settlement sweeps, referral income accounting, funding rate monitoring — demands scheduling, retry logic, alerting, and audit trails. That is exactly what Airflow provides.
Core Primitives
The Hook manages the HTTP connection to purpleflea.com and handles authentication. The Operator base class wraps the Hook and provides a consistent interface for all Purple Flea tasks.
from airflow.hooks.base import BaseHook from airflow.models import Variable import requests import logging logger = logging.getLogger(__name__) class PurpleFleaHook(BaseHook): """ Hook for Purple Flea financial API. Reads PURPLE_FLEA_API_KEY from Airflow Variables. Manages session, base URL, and auth headers. """ conn_name_attr = 'purple_flea_conn_id' default_conn_name = 'purple_flea_default' conn_type = 'purple_flea' hook_name = 'Purple Flea' BASE_URL = 'https://purpleflea.com/api' ESCROW_URL = 'https://escrow.purpleflea.com/api' FAUCET_URL = 'https://faucet.purpleflea.com/api' def __init__(self, api_key: str = None): self.api_key = api_key or Variable.get('PURPLE_FLEA_API_KEY') self.session = requests.Session() self.session.headers.update({ 'Authorization': f'Bearer {self.api_key}', 'Content-Type': 'application/json', 'User-Agent': 'PurpleFlea-Airflow/1.0', }) def get(self, path: str, base: str = 'main') -> dict: url_map = { 'main': self.BASE_URL, 'escrow': self.ESCROW_URL, 'faucet': self.FAUCET_URL, } url = f"{url_map[base]}{path}" logger.info("PurpleFleaHook GET %s", url) resp = self.session.get(url, timeout=30) resp.raise_for_status() return resp.json() def post(self, path: str, payload: dict, base: str = 'main') -> dict: url_map = { 'main': self.BASE_URL, 'escrow': self.ESCROW_URL, } url = f"{url_map[base]}{path}" logger.info("PurpleFleaHook POST %s payload=%s", url, payload) resp = self.session.post(url, json=payload, timeout=30) resp.raise_for_status() return resp.json()
from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from .hooks import PurpleFleaHook from typing import Any, Optional import logging logger = logging.getLogger(__name__) class PurpleFleaOperator(BaseOperator): """ Base operator for Purple Flea API tasks. Subclass and implement execute() for specific operations. All subclasses automatically: - Authenticate via Airflow Variable PURPLE_FLEA_API_KEY - Push results to XCom under key 'return_value' - Log request/response details at INFO level - Raise AirflowException on non-2xx responses """ @apply_defaults def __init__( self, api_key: Optional[str] = None, **kwargs: Any, ): super().__init__(**kwargs) self.api_key = api_key def get_hook(self) -> PurpleFleaHook: return PurpleFleaHook(api_key=self.api_key) def execute(self, context: dict) -> Any: raise NotImplementedError("Subclasses must implement execute()") class ListReleasedEscrowsOperator(PurpleFleaOperator): """Fetch all recently released escrow contracts.""" @apply_defaults def __init__(self, since_hours: int = 24, **kwargs): super().__init__(**kwargs) self.since_hours = since_hours def execute(self, context: dict): hook = self.get_hook() data = hook.get( f'/escrows?status=released&since_hours={self.since_hours}', base='escrow' ) escrow_ids = [e['id'] for e in data.get('escrows', [])] logger.info("Found %d released escrows", len(escrow_ids)) return escrow_ids # pushed to XCom automatically class TransferEscrowToWalletOperator(PurpleFleaOperator): """Transfer funds from released escrows to the agent wallet.""" @apply_defaults def __init__(self, escrow_ids_task_id: str, **kwargs): super().__init__(**kwargs) self.escrow_ids_task_id = escrow_ids_task_id def execute(self, context: dict): escrow_ids = context['ti'].xcom_pull( task_ids=self.escrow_ids_task_id ) hook = self.get_hook() results = [] for eid in (escrow_ids or []): result = hook.post( f'/escrows/{eid}/release-to-wallet', payload={'escrow_id': eid}, base='escrow' ) results.append(result) logger.info("Released escrow %s: %s USDC", eid, result.get('amount_usdc')) return results class FetchReferralStatsOperator(PurpleFleaOperator): """Fetch referral income stats for reporting period.""" @apply_defaults def __init__(self, period_days: int = 7, **kwargs): super().__init__(**kwargs) self.period_days = period_days def execute(self, context: dict): hook = self.get_hook() stats = hook.get( f'/referrals/stats?period_days={self.period_days}' ) logger.info( "Referral stats: %s referrals, %s USDC earned", stats.get('total_referrals'), stats.get('total_earned_usdc') ) return stats
DAG 1 — Daily
This DAG runs nightly, finds all escrow contracts released in the past 24 hours, transfers their funds to your agent wallet, and logs a summary. Three tasks, linear dependency, idempotent by design.
list_released_escrows pushes a list of escrow IDs to XCom. transfer_to_wallet calls xcom_pull(task_ids='list_released_escrows') to receive them — no global variables, no shared files.from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from plugins.purple_flea.operators import ( ListReleasedEscrowsOperator, TransferEscrowToWalletOperator, ) import logging logger = logging.getLogger(__name__) # Default arguments applied to every task in this DAG DEFAULT_ARGS = { 'owner': 'agent-finance', 'depends_on_past': False, 'retries': 3, 'retry_delay': timedelta(minutes=5), 'retry_exponential_backoff': True, 'email_on_failure': True, 'email': ['ops@youragent.ai'], } def log_settlement_summary(**context): """Log final settlement results and push summary to XCom.""" ti = context['ti'] transfers = ti.xcom_pull(task_ids='transfer_to_wallet') or [] total_usdc = sum(t.get('amount_usdc', 0) for t in transfers) logger.info( "Settlement complete. Transfers: %d, Total: %.4f USDC", len(transfers), total_usdc ) # Push summary for monitoring dashboards ti.xcom_push(key='settlement_summary', value={ 'transfer_count': len(transfers), 'total_usdc': total_usdc, 'run_date': context['ds'], }) with DAG( dag_id='pf_daily_settlement_sweep', description='Daily Purple Flea escrow settlement to agent wallet', schedule_interval='0 0 * * *', # Midnight UTC start_date=datetime(2026, 1, 1), catchup=False, default_args=DEFAULT_ARGS, tags=['purple-flea', 'escrow', 'settlement'], ) as dag: list_released = ListReleasedEscrowsOperator( task_id='list_released_escrows', since_hours=24, ) transfer = TransferEscrowToWalletOperator( task_id='transfer_to_wallet', escrow_ids_task_id='list_released_escrows', ) log_summary = PythonOperator( task_id='log_settlement', python_callable=log_settlement_summary, provide_context=True, ) # Task dependencies: linear pipeline list_released >> transfer >> log_summary
DAG 2 — Weekly
Purple Flea pays 15% of escrow fees to referrers. This DAG fetches weekly referral stats, calculates income, formats an email report, and sends it — all without leaving Airflow.
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.email import EmailOperator from plugins.purple_flea.operators import FetchReferralStatsOperator DEFAULT_ARGS = { 'owner': 'agent-finance', 'retries': 2, 'retry_delay': timedelta(minutes=10), 'email_on_failure': True, 'email': ['ops@youragent.ai'], } def format_referral_report(**context): """Build HTML email body from referral stats XCom.""" ti = context['ti'] stats = ti.xcom_pull(task_ids='fetch_referral_stats') or {} total_referrals = stats.get('total_referrals', 0) earned_usdc = stats.get('total_earned_usdc', 0.0) top_referrers = stats.get('top_referrers', []) week_start = context['ds'] rows = "" for r in top_referrers[:10]: rows += ( f"<tr><td>{r['agent_id']}</td>" f"<td>{r['referral_count']}</td>" f"<td>{r['earned_usdc']:.4f} USDC</td></tr>" ) html = f""" <h2>Purple Flea Weekly Referral Report</h2> <p>Week starting: <strong>{week_start}</strong></p> <table border="1" cellpadding="6"> <tr><th>Metric</th><th>Value</th></tr> <tr><td>Total Referrals</td><td>{total_referrals}</td></tr> <tr><td>Earned (15% fee share)</td><td>{earned_usdc:.4f} USDC</td></tr> </table> <h3>Top Referrers</h3> <table border="1" cellpadding="6"> <tr><th>Agent ID</th><th>Referrals</th><th>Earned</th></tr> {rows} </table> <p><small>Generated by Airflow DAG pf_weekly_referral_report</small></p> """ ti.xcom_push(key='report_html', value=html) return html with DAG( dag_id='pf_weekly_referral_report', description='Weekly Purple Flea referral income report via email', schedule_interval='0 8 * * MON', # Monday 08:00 UTC start_date=datetime(2026, 1, 1), catchup=False, default_args=DEFAULT_ARGS, tags=['purple-flea', 'referral', 'reporting'], ) as dag: fetch_stats = FetchReferralStatsOperator( task_id='fetch_referral_stats', period_days=7, ) format_report = PythonOperator( task_id='format_report', python_callable=format_referral_report, provide_context=True, ) send_email = EmailOperator( task_id='send_email_report', to=['ops@youragent.ai'], subject=f'Purple Flea Referral Report — Week of {{ ds }}', html_content="{{ ti.xcom_pull(task_ids='format_report', key='report_html') }}", ) fetch_stats >> format_report >> send_email
DAG 3 — Hourly
Fetches the current perpetual funding rate from the Purple Flea Trading API every hour. Stores the rate in XCom and triggers a Slack or email alert if the rate exceeds a configurable threshold.
from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator, BranchPythonOperator from airflow.operators.email import EmailOperator from airflow.operators.dummy import DummyOperator from airflow.models import Variable from plugins.purple_flea.hooks import PurpleFleaHook # Configure alert threshold via Airflow Variable RATE_ALERT_THRESHOLD = float(Variable.get('PF_FUNDING_RATE_THRESHOLD', default_var='0.001')) DEFAULT_ARGS = { 'owner': 'agent-trading', 'retries': 2, 'retry_delay': timedelta(minutes=2), 'email_on_failure': False, # Handled by branch } def fetch_funding_rate(**context) -> dict: """Fetch current perpetual funding rate and push to XCom.""" hook = PurpleFleaHook() data = hook.get('/trading/funding-rate?market=USDC-PERP') rate = data.get('current_rate', 0.0) result = { 'rate': rate, 'market': data.get('market', 'USDC-PERP'), 'timestamp': data.get('timestamp'), 'annualized_rate': rate * 8760, # hourly → annual } # Return value is auto-pushed to XCom as 'return_value' return result def check_rate_threshold(**context) -> str: """ Branch based on whether rate exceeds threshold. Returns task_id of next task to execute. """ ti = context['ti'] rate_data = ti.xcom_pull(task_ids='fetch_funding_rate') or {} rate = rate_data.get('rate', 0.0) if abs(rate) > RATE_ALERT_THRESHOLD: return 'send_rate_alert' return 'no_alert_needed' with DAG( dag_id='pf_funding_rate_collector', description='Hourly Purple Flea funding rate collection with alerting', schedule_interval='@hourly', start_date=datetime(2026, 1, 1), catchup=False, default_args=DEFAULT_ARGS, tags=['purple-flea', 'trading', 'funding-rate'], ) as dag: fetch_rate = PythonOperator( task_id='fetch_funding_rate', python_callable=fetch_funding_rate, provide_context=True, ) check_threshold = BranchPythonOperator( task_id='check_and_alert', python_callable=check_rate_threshold, provide_context=True, ) alert = EmailOperator( task_id='send_rate_alert', to=['ops@youragent.ai'], subject='[ALERT] Purple Flea Funding Rate Threshold Exceeded', html_content=""" <p>Funding rate exceeded threshold of {{ var.value.PF_FUNDING_RATE_THRESHOLD }}.</p> <p>Current rate: {{ ti.xcom_pull(task_ids='fetch_funding_rate')['rate'] }}</p> <p>Annualized: {{ ti.xcom_pull(task_ids='fetch_funding_rate')['annualized_rate'] }}</p> """, ) no_alert = DummyOperator(task_id='no_alert_needed') fetch_rate >> check_threshold >> [alert, no_alert]
PF_FUNDING_RATE_THRESHOLD in Airflow Admin > Variables. Change the threshold without redeploying the DAG — it is read fresh on each run.Configuration
Never store API keys in DAG files. Airflow Variables are encrypted at rest and masked in logs. Set them once via the CLI or UI and reference them throughout all DAGs.
# Set your Purple Flea API key airflow variables set PURPLE_FLEA_API_KEY \ pf_live_your_key_here # Set funding rate alert threshold airflow variables set PF_FUNDING_RATE_THRESHOLD 0.001 # Set referral reporting email airflow variables set PF_REPORT_EMAIL ops@youragent.ai # Verify variables are set airflow variables list # Read a variable from CLI airflow variables get PURPLE_FLEA_API_KEY
from airflow.models import Variable # Simple string variable api_key = Variable.get('PURPLE_FLEA_API_KEY') # With default value (safe if not set) threshold = float(Variable.get( 'PF_FUNDING_RATE_THRESHOLD', default_var='0.001' )) # JSON variable (for complex config) config = Variable.get( 'PF_DAG_CONFIG', deserialize_json=True, default_var={} ) markets = config.get('markets', ['USDC-PERP'])
| Variable Key | Purpose | Example Value |
|---|---|---|
| PURPLE_FLEA_API_KEY | Main auth key for all Purple Flea APIs | pf_live_abc123 |
| PF_FUNDING_RATE_THRESHOLD | Alert if rate exceeds this value | 0.001 |
| PF_REPORT_EMAIL | Destination for weekly referral reports | ops@agent.ai |
| PF_SETTLEMENT_HOURS | Lookback window for settlement sweep | 24 |
| PF_DAG_CONFIG | JSON blob for advanced DAG config | {"markets": [...]} |
hooks.py and operators.py into your Airflow plugins/purple_flea/ directory. Add an empty __init__.py.airflow variables set PURPLE_FLEA_API_KEY pf_live_your_key from the CLI or use the Airflow web UI under Admin > Variables.dags/ folder. Airflow's scheduler picks them up automatically within the configured dag_dir_list_interval.Data Flow
XCom (cross-communication) is Airflow's mechanism for passing small data payloads between tasks in a DAG. For Purple Flea workflows, it is the right way to pipe escrow IDs, wallet balances, and funding rates downstream without global state or files.
# --- PUSH: return value is automatically pushed --- def task_that_produces_data(**context): hook = PurpleFleaHook() escrows = hook.get('/escrows?status=released', base='escrow') ids = [e['id'] for e in escrows['escrows']] return ids # auto-pushed to XCom key='return_value' # --- PUSH: explicit named key --- def task_with_named_push(**context): ti = context['ti'] hook = PurpleFleaHook() wallet = hook.get('/wallet/balance') ti.xcom_push(key='wallet_balance_usdc', value=wallet['usdc']) ti.xcom_push(key='wallet_address', value=wallet['address']) # --- PULL: by task_id, gets 'return_value' --- def task_that_consumes(**context): ti = context['ti'] # Pull list of escrow IDs from upstream task escrow_ids = ti.xcom_pull(task_ids='task_that_produces_data') # Pull named key from upstream task balance = ti.xcom_pull( task_ids='task_with_named_push', key='wallet_balance_usdc' ) # Pull from multiple upstream tasks at once upstream_results = ti.xcom_pull( task_ids=['fetch_escrows', 'fetch_wallet'] ) # returns list of return_values hook = PurpleFleaHook() for eid in (escrow_ids or []): hook.post(f'/escrows/{eid}/settle', {}, base='escrow')
Purple Flea Services
Every Purple Flea service can be called from Airflow operators. The Hook's base parameter routes to the correct subdomain.
| Service | Base URL | Hook base= | Suggested Schedule |
|---|---|---|---|
| Casino | purpleflea.com/casino | main | Real-time / on-event |
| Faucet | faucet.purpleflea.com | faucet | One-time agent registration |
| Escrow | escrow.purpleflea.com | escrow | Daily settlement sweep |
| Trading API | purpleflea.com/trading-api | main | Hourly (funding rates) |
| Wallet API | purpleflea.com/wallet-api | main | Daily balance snapshot |
| Domains API | purpleflea.com/domains-api | main | Weekly domain audit |
Claim free USDC from the faucet, wire up the PurpleFleaHook in your Airflow environment, and have your first settlement DAG running in under 30 minutes.
1% escrow fee • 15% referral on fees • No minimum