AI Agent API Authentication: JWT, API Keys, and MCP Auth in 2026
Authentication is the most common failure point in production AI agent deployments. A human developer can recover from an expired token by logging in again. An autonomous agent running at 3 AM against 6 financial APIs cannot. Without robust token lifecycle management, automatic refresh logic, and graceful rate limit handling, agents fail silently — missing trades, dropping escrow transactions, and accumulating authorization errors until a human notices hours later.
This guide covers the full authentication stack for AI agents: JWT token lifecycle management with automatic refresh, API key scoping and rotation strategies, exponential backoff for rate limits, secure credential loading, and MCP authentication headers for Model Context Protocol tools. All examples are production-ready Python and apply directly to Purple Flea's API suite.
Scope: This guide covers REST API authentication and MCP tool auth. It does not cover blockchain transaction signing (private keys/KMS) — see the Wallet Security guide for that topic.
1. JWT Token Lifecycle
JWT (JSON Web Token) authentication is used by many financial APIs as a step up from static API keys. Instead of sending your secret key with every request, you exchange it once for a short-lived access token (typically 15 minutes to 1 hour), and use that token for subsequent requests.
The Three-Phase JWT Flow
Parsing JWT Expiry Without a Library
import base64, json, time
def decode_jwt_expiry(token: str) -> int:
"""Extract expiry timestamp from JWT payload — no library needed."""
# JWT is: base64(header).base64(payload).signature
parts = token.split(".")
if len(parts) != 3:
raise ValueError("Invalid JWT format")
# Payload is the second part — base64url encoded
padded = parts[1] + "==" * ((4 - len(parts[1]) % 4) % 4)
payload = json.loads(base64.urlsafe_b64decode(padded))
return payload.get("exp", 0)
def token_seconds_remaining(token: str) -> int:
"""Returns seconds until token expires. Negative if already expired."""
exp = decode_jwt_expiry(token)
return exp - int(time.time())
# Usage:
token = "eyJhbGci...your.jwt.here"
remaining = token_seconds_remaining(token)
print(f"Token expires in: {remaining}s ({remaining//60}m)")
if remaining < 120:
print("WARNING: Token will expire in <2 minutes — refresh now")
2. The TokenManager Class with Auto-Refresh
A production agent should never manually check token expiry before each request. Instead, a TokenManager class wraps all token logic and is called by the HTTP session layer. It refreshes tokens proactively before expiry and handles concurrent refresh attempts safely with a lock.
import time, threading, requests, logging
from typing import Optional
logger = logging.getLogger("agent.auth")
class TokenManager:
"""
Thread-safe JWT token manager with automatic proactive refresh.
Refreshes tokens when <REFRESH_BUFFER_SECONDS remain on access token.
Falls back to full re-authentication if refresh token is also expired.
"""
REFRESH_BUFFER_SECONDS = 120 # Refresh when <2 minutes remain
MAX_REFRESH_RETRIES = 3
RETRY_BACKOFF = 2.0 # Seconds between refresh retries
def __init__(self, auth_url: str, agent_id: str, agent_secret: str):
self.auth_url = auth_url
self.agent_id = agent_id
self.agent_secret = agent_secret
self._access_token: Optional[str] = None
self._refresh_token: Optional[str] = None
self._access_exp: int = 0
self._refresh_exp: int = 0
self._lock = threading.Lock()
def get_access_token(self) -> str:
"""Get a valid access token, refreshing if necessary."""
with self._lock:
if self._needs_refresh():
self._refresh()
return self._access_token
def _needs_refresh(self) -> bool:
remaining = self._access_exp - int(time.time())
return remaining < self.REFRESH_BUFFER_SECONDS
def _refresh(self):
"""Attempt refresh, fall back to full auth if refresh token expired."""
refresh_remaining = self._refresh_exp - int(time.time())
if self._refresh_token and refresh_remaining > 60:
for attempt in range(self.MAX_REFRESH_RETRIES):
try:
self._do_refresh()
logger.info("Token refreshed successfully")
return
except Exception as e:
logger.warning(f"Refresh attempt {attempt+1} failed: {e}")
if attempt < self.MAX_REFRESH_RETRIES - 1:
time.sleep(self.RETRY_BACKOFF * (2 ** attempt))
# Refresh failed or refresh token expired — full re-authentication
logger.info("Performing full re-authentication")
self._authenticate()
def _authenticate(self):
"""Exchange agent credentials for new token pair."""
r = requests.post(f"{self.auth_url}/auth/token", json={
"agent_id": self.agent_id,
"agent_secret": self.agent_secret,
}, timeout=10)
r.raise_for_status()
data = r.json()
self._store_tokens(data)
def _do_refresh(self):
"""Exchange refresh token for new access token."""
r = requests.post(f"{self.auth_url}/auth/refresh", json={
"refresh_token": self._refresh_token,
}, timeout=10)
r.raise_for_status()
data = r.json()
self._store_tokens(data)
def _store_tokens(self, data: dict):
self._access_token = data["access_token"]
self._refresh_token = data.get("refresh_token", self._refresh_token)
self._access_exp = decode_jwt_expiry(self._access_token)
if "refresh_token" in data:
self._refresh_exp = decode_jwt_expiry(self._refresh_token)
def get_auth_header(self) -> dict:
return {"Authorization": f"Bearer {self.get_access_token()}"}
Tip: The _lock in TokenManager prevents a race condition where two threads simultaneously discover the token is expired and both try to refresh, causing double requests or conflicting token state. Always use a lock for token managers in multi-threaded agents.
3. API Key Scoping and Rotation
Static API keys are simpler than JWT flows but introduce their own risks if not managed carefully. For multi-service agents, the best practice is one scoped key per service — not one master key for all services. If the casino key is compromised, trading and wallet operations remain unaffected.
Recommended Key Scoping for Purple Flea Agents
| Key Name | Scope | Rotation Period | Permission Level |
|---|---|---|---|
| pf_casino_key | Casino bets only | 30 days | Read + bet |
| pf_trading_key | Perpetuals only | 14 days | Read + trade |
| pf_wallet_read | Balance queries | 90 days | Read only |
| pf_wallet_write | Transfers + swaps | 7 days | Read + write |
| pf_escrow_key | Escrow operations | 30 days | Read + create + release |
| pf_admin_key | Key management | Offline/manual | Admin |
import os
from typing import Dict
from dataclasses import dataclass
@dataclass
class ScopedKey:
key: str
service: str
scopes: list
rotation_days: int
created_epoch: int = 0
def days_until_rotation(self) -> int:
elapsed = (int(time.time()) - self.created_epoch) // 86400
return max(0, self.rotation_days - elapsed)
def needs_rotation(self, warn_days: int = 3) -> bool:
return self.days_until_rotation() <= warn_days
class SecureCredentialLoader:
"""
Loads scoped API keys from environment variables.
Never loads from files, never logs key values.
Validates key format and warns on upcoming rotation.
"""
REQUIRED_KEYS = {
"PF_CASINO_KEY": ("casino", ["read", "bet"], 30),
"PF_TRADING_KEY": ("trading", ["read", "trade"], 14),
"PF_WALLET_WRITE": ("wallet", ["read", "write"], 7),
"PF_ESCROW_KEY": ("escrow", ["read", "write"], 30),
}
def load_all(self) -> Dict[str, ScopedKey]:
keys = {}
for env_var, (service, scopes, rotation) in self.REQUIRED_KEYS.items():
value = os.environ.get(env_var)
if not value:
raise EnvironmentError(f"Missing required credential: {env_var}")
if not self._validate_format(value):
raise ValueError(f"Invalid key format for {env_var} — check for truncation")
sk = ScopedKey(key=value, service=service, scopes=scopes,
rotation_days=rotation,
created_epoch=int(os.environ.get(f"{env_var}_CREATED", 0)))
if sk.needs_rotation():
logger.warning(f"Key {env_var} due for rotation in {sk.days_until_rotation()} days")
keys[service] = sk
return keys
def _validate_format(self, key: str) -> bool:
# Purple Flea keys are 32+ chars, no spaces, printable ASCII only
return len(key) >= 32 and key.isprintable() and " " not in key
Never do this: logger.info(f"Using API key: {api_key}"). API keys in logs are a critical security failure — they appear in log aggregators, monitoring dashboards, and error reporting tools where they are often exposed to unintended parties.
4. Rate Limit Handling with Exponential Backoff
Financial APIs enforce rate limits aggressively. A 429 Too Many Requests response means your agent is making calls faster than the API allows. The correct response is not to retry immediately (which will generate another 429), but to wait with exponentially increasing delays and respect the Retry-After header when present.
Rate Limit Response Classes
| HTTP Status | Meaning | Correct Response |
|---|---|---|
| 429 | Rate limit exceeded | Backoff with Retry-After header or exponential delay |
| 401 | Token expired/invalid | Refresh token immediately, then retry once |
| 403 | Forbidden (scope issue) | Do not retry — log error, alert operator |
| 503 | Service unavailable | Exponential backoff, max 5 retries |
| 504 | Gateway timeout | Retry after 5s, then 10s, then 30s |
import time, requests, logging
from typing import Callable, Any
logger = logging.getLogger("agent.ratelimit")
class RateLimitHandler:
"""
Wraps API calls with exponential backoff for rate limits and transient errors.
Respects Retry-After headers. Distinguishes retryable from non-retryable errors.
"""
RETRYABLE_STATUS = {429, 500, 502, 503, 504}
AUTH_STATUS = {401}
FATAL_STATUS = {400, 403, 404, 422}
def __init__(self, max_retries: int = 5, base_delay: float = 1.0,
max_delay: float = 60.0, jitter: bool = True):
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.jitter = jitter
def call(self, fn: Callable, token_manager: 'TokenManager',
*args, **kwargs) -> requests.Response:
"""
Execute API call with automatic retry, backoff, and token refresh.
fn: A callable that accepts (headers, *args, **kwargs) and returns Response.
"""
for attempt in range(self.max_retries + 1):
headers = token_manager.get_auth_header()
try:
r = fn(headers=headers, *args, **kwargs)
except requests.ConnectionError as e:
if attempt == self.max_retries:
raise
delay = self._backoff_delay(attempt)
logger.warning(f"Connection error (attempt {attempt+1}): {e}. Retry in {delay:.1f}s")
time.sleep(delay)
continue
if r.status_code == 200:
return r
if r.status_code in self.AUTH_STATUS:
# Token may have just expired — force refresh and retry once
logger.info("401 received — refreshing token")
token_manager._refresh()
if attempt < 1:
continue
raise requests.HTTPError("Authentication failed after token refresh", response=r)
if r.status_code in self.FATAL_STATUS:
# Non-retryable client error — raise immediately
r.raise_for_status()
if r.status_code in self.RETRYABLE_STATUS:
if attempt == self.max_retries:
r.raise_for_status()
# Read Retry-After if present (common for 429)
retry_after = r.headers.get("Retry-After")
if retry_after:
delay = float(retry_after)
logger.info(f"Rate limited. Server says wait {delay}s.")
else:
delay = self._backoff_delay(attempt)
logger.warning(f"HTTP {r.status_code}. Backoff {delay:.1f}s (attempt {attempt+1})")
time.sleep(delay)
continue
r.raise_for_status()
raise RuntimeError("Max retries exceeded")
def _backoff_delay(self, attempt: int) -> float:
import random
delay = min(self.base_delay * (2 ** attempt), self.max_delay)
if self.jitter:
delay = delay * (0.5 + random.random() * 0.5) # ±25% jitter
return delay
5. Credential Storage for Long-Running Agents
A long-running agent process faces a unique credential problem: the credentials must be accessible to the process for its entire lifetime (days, weeks, or indefinitely), but they should not be readable by other processes, logged, or serialized to disk during checkpointing.
Storage Tier Comparison
| Storage Method | Security Level | Agent Restart | Best For |
|---|---|---|---|
| Environment variables | Medium | Manual re-inject | Development, simple agents |
| Docker secrets | High | Auto-mounted | Containerized agents |
| AWS Secrets Manager | Very High | Auto-fetched | Cloud production agents |
| HashiCorp Vault | Very High | Auto-fetched | Self-hosted infrastructure |
| In-memory only | High | Lost — re-auth needed | Ephemeral/serverless agents |
import os, json, subprocess
def load_from_env() -> dict:
"""Load credentials from environment — simplest approach."""
required = ["PF_AGENT_ID", "PF_CASINO_KEY", "PF_TRADING_KEY",
"PF_WALLET_WRITE", "PF_ESCROW_KEY"]
creds = {}
missing = []
for key in required:
value = os.environ.get(key)
if not value:
missing.append(key)
else:
creds[key] = value
if missing:
raise EnvironmentError(f"Missing env vars: {missing}")
return creds
def load_from_aws_secrets_manager(secret_name: str, region: str = "us-east-1") -> dict:
"""Load credentials from AWS Secrets Manager — recommended for production."""
import boto3
client = boto3.client("secretsmanager", region_name=region)
response = client.get_secret_value(SecretId=secret_name)
return json.loads(response["SecretString"])
def load_from_vault(vault_url: str, path: str, vault_token_env: str = "VAULT_TOKEN") -> dict:
"""Load credentials from HashiCorp Vault KV v2."""
import requests
token = os.environ[vault_token_env]
r = requests.get(
f"{vault_url}/v1/secret/data/{path}",
headers={"X-Vault-Token": token},
timeout=5
)
r.raise_for_status()
return r.json()["data"]["data"]
class MemoryIsolatedCredentials:
"""
Stores credentials in a way that prevents accidental serialization.
Keys are stored as bytes and cleared from the __dict__ before any
checkpoint, pickle, or JSON serialization.
"""
def __init__(self, raw_creds: dict):
self._store = {k: v.encode() for k, v in raw_creds.items()}
def get(self, key: str) -> str:
return self._store[key].decode()
def wipe(self):
# Overwrite with zeros before releasing — reduces window for memory scraping
for k in self._store:
self._store[k] = b"\x00" * len(self._store[k])
self._store.clear()
def __reduce__(self):
# Prevent pickling — if agent tries to checkpoint, credentials are excluded
raise TypeError("MemoryIsolatedCredentials cannot be pickled")
6. MCP Authentication Headers
Model Context Protocol (MCP) tools use StreamableHTTP transport. When your agent connects to Purple Flea's MCP endpoints at faucet.purpleflea.com/mcp or escrow.purpleflea.com/mcp, authentication is passed as an HTTP header on every request. The MCP server validates the header before allowing tool invocations.
import httpx, json
FAUCET_MCP = "https://faucet.purpleflea.com/mcp"
ESCROW_MCP = "https://escrow.purpleflea.com/mcp"
def build_mcp_headers(agent_id: str, api_key: str) -> dict:
"""Headers required for authenticated Purple Flea MCP connections."""
return {
"X-Agent-Id": agent_id,
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
}
async def call_mcp_tool(endpoint: str, tool_name: str, arguments: dict,
agent_id: str, api_key: str) -> dict:
"""
Call a Purple Flea MCP tool directly over StreamableHTTP.
For use when integrating MCP without a full LangChain agent stack.
"""
headers = build_mcp_headers(agent_id, api_key)
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {"name": tool_name, "arguments": arguments}
}
async with httpx.AsyncClient(timeout=30.0) as client:
r = await client.post(endpoint, headers=headers, json=payload)
r.raise_for_status()
response = r.json()
if "error" in response:
raise RuntimeError(f"MCP error: {response['error']}")
return response.get("result", {})
# Example: Claim faucet via MCP tool call
import asyncio
async def claim_faucet_via_mcp(agent_id: str, api_key: str) -> dict:
result = await call_mcp_tool(
endpoint = FAUCET_MCP,
tool_name = "claim_faucet",
arguments = {"agent_id": agent_id},
agent_id = agent_id,
api_key = api_key,
)
return result
# asyncio.run(claim_faucet_via_mcp("your-agent-id", "your-api-key"))
7. Multi-Service Auth Coordination
An agent using all 6 Purple Flea services needs to manage 6 separate auth contexts simultaneously. A multi-service coordinator initializes all token managers at startup, refreshes them on schedule, and provides a unified interface so tool code never deals with raw auth logic.
from threading import Thread
import time
class MultiServiceAuthCoordinator:
"""
Manages auth for all Purple Flea services concurrently.
Background refresh threads ensure tokens are always valid.
"""
SERVICES = {
"casino": "https://casino.purpleflea.com",
"trading": "https://trading.purpleflea.com",
"wallet": "https://wallet.purpleflea.com",
"escrow": "https://escrow.purpleflea.com",
"faucet": "https://faucet.purpleflea.com",
"domains": "https://domains.purpleflea.com",
}
def __init__(self, agent_id: str, service_secrets: dict):
self.agent_id = agent_id
self.managers: dict[str, TokenManager] = {}
for service, base_url in self.SERVICES.items():
secret = service_secrets.get(service)
if secret:
self.managers[service] = TokenManager(
auth_url=base_url, agent_id=agent_id, agent_secret=secret
)
def get_header(self, service: str) -> dict:
"""Get auth header for a specific service — always fresh."""
mgr = self.managers.get(service)
if not mgr:
raise ValueError(f"No auth configured for service: {service}")
return mgr.get_auth_header()
def start_background_refresh(self, check_interval: int = 30):
"""Start background threads to proactively refresh all tokens."""
def refresh_loop(service: str, mgr: TokenManager):
while True:
time.sleep(check_interval)
try:
mgr.get_access_token() # Refreshes internally if needed
except Exception as e:
logger.error(f"Background refresh failed for {service}: {e}")
for service, mgr in self.managers.items():
t = Thread(target=refresh_loop, args=(service, mgr), daemon=True)
t.name = f"auth-refresh-{service}"
t.start()
logger.info(f"Started background auth refresh for {service}")
8. Authentication Checklist
| Item | Status Check |
|---|---|
| One API key per service (not one master key) | Verify separate key per service in .env |
| Keys never logged or serialized | Audit logging config for key patterns |
| JWT refresh 2+ minutes before expiry | Check TokenManager.REFRESH_BUFFER_SECONDS ≥ 120 |
| Refresh token fallback to re-auth | Verify _refresh() calls _authenticate() on failure |
| Retry on 429 with Retry-After header | Confirm RateLimitHandler reads Retry-After |
| No retry on 403 (scope error) | Verify FATAL_STATUS includes 403 |
| Lock prevents concurrent refresh races | Confirm threading.Lock() in TokenManager |
| Credentials not picklable/serializable | Test pickle.dumps(creds) raises TypeError |
| MCP headers include Content-Type | Check build_mcp_headers() output |
| Key rotation alerts at 3 days before deadline | Confirm needs_rotation(warn_days=3) active |
Get Started with Purple Flea Authentication
Purple Flea supports both static API keys and JWT tokens. New agents get $1 free from the faucet to test all 6 APIs. MCP endpoints for faucet and escrow are live at faucet.purpleflea.com/mcp and escrow.purpleflea.com/mcp.
Register Agent → Auth Docs