Guide

AI Agent API Authentication: JWT, API Keys, and MCP Auth in 2026

March 6, 2026 28 min read Purple Flea Team

Authentication is the most common failure point in production AI agent deployments. A human developer can recover from an expired token by logging in again. An autonomous agent running at 3 AM against 6 financial APIs cannot. Without robust token lifecycle management, automatic refresh logic, and graceful rate limit handling, agents fail silently — missing trades, dropping escrow transactions, and accumulating authorization errors until a human notices hours later.

This guide covers the full authentication stack for AI agents: JWT token lifecycle management with automatic refresh, API key scoping and rotation strategies, exponential backoff for rate limits, secure credential loading, and MCP authentication headers for Model Context Protocol tools. All examples are production-ready Python and apply directly to Purple Flea's API suite.

Scope: This guide covers REST API authentication and MCP tool auth. It does not cover blockchain transaction signing (private keys/KMS) — see the Wallet Security guide for that topic.

1. JWT Token Lifecycle

JWT (JSON Web Token) authentication is used by many financial APIs as a step up from static API keys. Instead of sending your secret key with every request, you exchange it once for a short-lived access token (typically 15 minutes to 1 hour), and use that token for subsequent requests.

The Three-Phase JWT Flow

1
Authentication Exchange POST your agent credentials (agent_id + secret) to the /auth/token endpoint. Receive an access_token (short-lived) and a refresh_token (long-lived).
2
API Calls with Access Token Include the access_token in every request as Authorization: Bearer <token>. Continue until the token approaches expiry.
3
Proactive Refresh Before the access_token expires (or on receiving 401), POST the refresh_token to /auth/refresh. Receive a new access_token. Repeat from step 2.

Parsing JWT Expiry Without a Library

import base64, json, time

def decode_jwt_expiry(token: str) -> int:
    """Extract expiry timestamp from JWT payload — no library needed."""
    # JWT is: base64(header).base64(payload).signature
    parts = token.split(".")
    if len(parts) != 3:
        raise ValueError("Invalid JWT format")

    # Payload is the second part — base64url encoded
    padded  = parts[1] + "==" * ((4 - len(parts[1]) % 4) % 4)
    payload = json.loads(base64.urlsafe_b64decode(padded))
    return payload.get("exp", 0)

def token_seconds_remaining(token: str) -> int:
    """Returns seconds until token expires. Negative if already expired."""
    exp = decode_jwt_expiry(token)
    return exp - int(time.time())

# Usage:
token = "eyJhbGci...your.jwt.here"
remaining = token_seconds_remaining(token)
print(f"Token expires in: {remaining}s ({remaining//60}m)")
if remaining < 120:
    print("WARNING: Token will expire in <2 minutes — refresh now")

2. The TokenManager Class with Auto-Refresh

A production agent should never manually check token expiry before each request. Instead, a TokenManager class wraps all token logic and is called by the HTTP session layer. It refreshes tokens proactively before expiry and handles concurrent refresh attempts safely with a lock.

import time, threading, requests, logging
from typing import Optional

logger = logging.getLogger("agent.auth")

class TokenManager:
    """
    Thread-safe JWT token manager with automatic proactive refresh.
    Refreshes tokens when <REFRESH_BUFFER_SECONDS remain on access token.
    Falls back to full re-authentication if refresh token is also expired.
    """

    REFRESH_BUFFER_SECONDS = 120   # Refresh when <2 minutes remain
    MAX_REFRESH_RETRIES    = 3
    RETRY_BACKOFF          = 2.0   # Seconds between refresh retries

    def __init__(self, auth_url: str, agent_id: str, agent_secret: str):
        self.auth_url     = auth_url
        self.agent_id     = agent_id
        self.agent_secret = agent_secret
        self._access_token:  Optional[str] = None
        self._refresh_token: Optional[str] = None
        self._access_exp:    int = 0
        self._refresh_exp:   int = 0
        self._lock           = threading.Lock()

    def get_access_token(self) -> str:
        """Get a valid access token, refreshing if necessary."""
        with self._lock:
            if self._needs_refresh():
                self._refresh()
            return self._access_token

    def _needs_refresh(self) -> bool:
        remaining = self._access_exp - int(time.time())
        return remaining < self.REFRESH_BUFFER_SECONDS

    def _refresh(self):
        """Attempt refresh, fall back to full auth if refresh token expired."""
        refresh_remaining = self._refresh_exp - int(time.time())

        if self._refresh_token and refresh_remaining > 60:
            for attempt in range(self.MAX_REFRESH_RETRIES):
                try:
                    self._do_refresh()
                    logger.info("Token refreshed successfully")
                    return
                except Exception as e:
                    logger.warning(f"Refresh attempt {attempt+1} failed: {e}")
                    if attempt < self.MAX_REFRESH_RETRIES - 1:
                        time.sleep(self.RETRY_BACKOFF * (2 ** attempt))

        # Refresh failed or refresh token expired — full re-authentication
        logger.info("Performing full re-authentication")
        self._authenticate()

    def _authenticate(self):
        """Exchange agent credentials for new token pair."""
        r = requests.post(f"{self.auth_url}/auth/token", json={
            "agent_id":     self.agent_id,
            "agent_secret": self.agent_secret,
        }, timeout=10)
        r.raise_for_status()
        data = r.json()
        self._store_tokens(data)

    def _do_refresh(self):
        """Exchange refresh token for new access token."""
        r = requests.post(f"{self.auth_url}/auth/refresh", json={
            "refresh_token": self._refresh_token,
        }, timeout=10)
        r.raise_for_status()
        data = r.json()
        self._store_tokens(data)

    def _store_tokens(self, data: dict):
        self._access_token   = data["access_token"]
        self._refresh_token  = data.get("refresh_token", self._refresh_token)
        self._access_exp     = decode_jwt_expiry(self._access_token)
        if "refresh_token" in data:
            self._refresh_exp = decode_jwt_expiry(self._refresh_token)

    def get_auth_header(self) -> dict:
        return {"Authorization": f"Bearer {self.get_access_token()}"}

Tip: The _lock in TokenManager prevents a race condition where two threads simultaneously discover the token is expired and both try to refresh, causing double requests or conflicting token state. Always use a lock for token managers in multi-threaded agents.

3. API Key Scoping and Rotation

Static API keys are simpler than JWT flows but introduce their own risks if not managed carefully. For multi-service agents, the best practice is one scoped key per service — not one master key for all services. If the casino key is compromised, trading and wallet operations remain unaffected.

Recommended Key Scoping for Purple Flea Agents

Key Name Scope Rotation Period Permission Level
pf_casino_keyCasino bets only30 daysRead + bet
pf_trading_keyPerpetuals only14 daysRead + trade
pf_wallet_readBalance queries90 daysRead only
pf_wallet_writeTransfers + swaps7 daysRead + write
pf_escrow_keyEscrow operations30 daysRead + create + release
pf_admin_keyKey managementOffline/manualAdmin
import os
from typing import Dict
from dataclasses import dataclass

@dataclass
class ScopedKey:
    key:      str
    service:  str
    scopes:   list
    rotation_days: int
    created_epoch: int = 0

    def days_until_rotation(self) -> int:
        elapsed = (int(time.time()) - self.created_epoch) // 86400
        return max(0, self.rotation_days - elapsed)

    def needs_rotation(self, warn_days: int = 3) -> bool:
        return self.days_until_rotation() <= warn_days

class SecureCredentialLoader:
    """
    Loads scoped API keys from environment variables.
    Never loads from files, never logs key values.
    Validates key format and warns on upcoming rotation.
    """

    REQUIRED_KEYS = {
        "PF_CASINO_KEY":    ("casino",  ["read", "bet"],   30),
        "PF_TRADING_KEY":   ("trading", ["read", "trade"], 14),
        "PF_WALLET_WRITE":  ("wallet",  ["read", "write"], 7),
        "PF_ESCROW_KEY":    ("escrow",  ["read", "write"], 30),
    }

    def load_all(self) -> Dict[str, ScopedKey]:
        keys = {}
        for env_var, (service, scopes, rotation) in self.REQUIRED_KEYS.items():
            value = os.environ.get(env_var)
            if not value:
                raise EnvironmentError(f"Missing required credential: {env_var}")
            if not self._validate_format(value):
                raise ValueError(f"Invalid key format for {env_var} — check for truncation")

            sk = ScopedKey(key=value, service=service, scopes=scopes,
                           rotation_days=rotation,
                           created_epoch=int(os.environ.get(f"{env_var}_CREATED", 0)))

            if sk.needs_rotation():
                logger.warning(f"Key {env_var} due for rotation in {sk.days_until_rotation()} days")

            keys[service] = sk
        return keys

    def _validate_format(self, key: str) -> bool:
        # Purple Flea keys are 32+ chars, no spaces, printable ASCII only
        return len(key) >= 32 and key.isprintable() and " " not in key

Never do this: logger.info(f"Using API key: {api_key}"). API keys in logs are a critical security failure — they appear in log aggregators, monitoring dashboards, and error reporting tools where they are often exposed to unintended parties.

4. Rate Limit Handling with Exponential Backoff

Financial APIs enforce rate limits aggressively. A 429 Too Many Requests response means your agent is making calls faster than the API allows. The correct response is not to retry immediately (which will generate another 429), but to wait with exponentially increasing delays and respect the Retry-After header when present.

Rate Limit Response Classes

HTTP Status Meaning Correct Response
429Rate limit exceededBackoff with Retry-After header or exponential delay
401Token expired/invalidRefresh token immediately, then retry once
403Forbidden (scope issue)Do not retry — log error, alert operator
503Service unavailableExponential backoff, max 5 retries
504Gateway timeoutRetry after 5s, then 10s, then 30s
import time, requests, logging
from typing import Callable, Any

logger = logging.getLogger("agent.ratelimit")

class RateLimitHandler:
    """
    Wraps API calls with exponential backoff for rate limits and transient errors.
    Respects Retry-After headers. Distinguishes retryable from non-retryable errors.
    """

    RETRYABLE_STATUS = {429, 500, 502, 503, 504}
    AUTH_STATUS      = {401}
    FATAL_STATUS     = {400, 403, 404, 422}

    def __init__(self, max_retries: int = 5, base_delay: float = 1.0,
                 max_delay: float = 60.0, jitter: bool = True):
        self.max_retries = max_retries
        self.base_delay  = base_delay
        self.max_delay   = max_delay
        self.jitter      = jitter

    def call(self, fn: Callable, token_manager: 'TokenManager',
             *args, **kwargs) -> requests.Response:
        """
        Execute API call with automatic retry, backoff, and token refresh.
        fn: A callable that accepts (headers, *args, **kwargs) and returns Response.
        """
        for attempt in range(self.max_retries + 1):
            headers = token_manager.get_auth_header()
            try:
                r = fn(headers=headers, *args, **kwargs)
            except requests.ConnectionError as e:
                if attempt == self.max_retries:
                    raise
                delay = self._backoff_delay(attempt)
                logger.warning(f"Connection error (attempt {attempt+1}): {e}. Retry in {delay:.1f}s")
                time.sleep(delay)
                continue

            if r.status_code == 200:
                return r

            if r.status_code in self.AUTH_STATUS:
                # Token may have just expired — force refresh and retry once
                logger.info("401 received — refreshing token")
                token_manager._refresh()
                if attempt < 1:
                    continue
                raise requests.HTTPError("Authentication failed after token refresh", response=r)

            if r.status_code in self.FATAL_STATUS:
                # Non-retryable client error — raise immediately
                r.raise_for_status()

            if r.status_code in self.RETRYABLE_STATUS:
                if attempt == self.max_retries:
                    r.raise_for_status()

                # Read Retry-After if present (common for 429)
                retry_after = r.headers.get("Retry-After")
                if retry_after:
                    delay = float(retry_after)
                    logger.info(f"Rate limited. Server says wait {delay}s.")
                else:
                    delay = self._backoff_delay(attempt)
                    logger.warning(f"HTTP {r.status_code}. Backoff {delay:.1f}s (attempt {attempt+1})")

                time.sleep(delay)
                continue

            r.raise_for_status()

        raise RuntimeError("Max retries exceeded")

    def _backoff_delay(self, attempt: int) -> float:
        import random
        delay = min(self.base_delay * (2 ** attempt), self.max_delay)
        if self.jitter:
            delay = delay * (0.5 + random.random() * 0.5)  # ±25% jitter
        return delay

5. Credential Storage for Long-Running Agents

A long-running agent process faces a unique credential problem: the credentials must be accessible to the process for its entire lifetime (days, weeks, or indefinitely), but they should not be readable by other processes, logged, or serialized to disk during checkpointing.

Storage Tier Comparison

Storage Method Security Level Agent Restart Best For
Environment variablesMediumManual re-injectDevelopment, simple agents
Docker secretsHighAuto-mountedContainerized agents
AWS Secrets ManagerVery HighAuto-fetchedCloud production agents
HashiCorp VaultVery HighAuto-fetchedSelf-hosted infrastructure
In-memory onlyHighLost — re-auth neededEphemeral/serverless agents
import os, json, subprocess

def load_from_env() -> dict:
    """Load credentials from environment — simplest approach."""
    required = ["PF_AGENT_ID", "PF_CASINO_KEY", "PF_TRADING_KEY",
                "PF_WALLET_WRITE", "PF_ESCROW_KEY"]
    creds = {}
    missing = []
    for key in required:
        value = os.environ.get(key)
        if not value:
            missing.append(key)
        else:
            creds[key] = value
    if missing:
        raise EnvironmentError(f"Missing env vars: {missing}")
    return creds

def load_from_aws_secrets_manager(secret_name: str, region: str = "us-east-1") -> dict:
    """Load credentials from AWS Secrets Manager — recommended for production."""
    import boto3
    client = boto3.client("secretsmanager", region_name=region)
    response = client.get_secret_value(SecretId=secret_name)
    return json.loads(response["SecretString"])

def load_from_vault(vault_url: str, path: str, vault_token_env: str = "VAULT_TOKEN") -> dict:
    """Load credentials from HashiCorp Vault KV v2."""
    import requests
    token = os.environ[vault_token_env]
    r = requests.get(
        f"{vault_url}/v1/secret/data/{path}",
        headers={"X-Vault-Token": token},
        timeout=5
    )
    r.raise_for_status()
    return r.json()["data"]["data"]

class MemoryIsolatedCredentials:
    """
    Stores credentials in a way that prevents accidental serialization.
    Keys are stored as bytes and cleared from the __dict__ before any
    checkpoint, pickle, or JSON serialization.
    """
    def __init__(self, raw_creds: dict):
        self._store = {k: v.encode() for k, v in raw_creds.items()}

    def get(self, key: str) -> str:
        return self._store[key].decode()

    def wipe(self):
        # Overwrite with zeros before releasing — reduces window for memory scraping
        for k in self._store:
            self._store[k] = b"\x00" * len(self._store[k])
        self._store.clear()

    def __reduce__(self):
        # Prevent pickling — if agent tries to checkpoint, credentials are excluded
        raise TypeError("MemoryIsolatedCredentials cannot be pickled")

6. MCP Authentication Headers

Model Context Protocol (MCP) tools use StreamableHTTP transport. When your agent connects to Purple Flea's MCP endpoints at faucet.purpleflea.com/mcp or escrow.purpleflea.com/mcp, authentication is passed as an HTTP header on every request. The MCP server validates the header before allowing tool invocations.

import httpx, json

FAUCET_MCP   = "https://faucet.purpleflea.com/mcp"
ESCROW_MCP   = "https://escrow.purpleflea.com/mcp"

def build_mcp_headers(agent_id: str, api_key: str) -> dict:
    """Headers required for authenticated Purple Flea MCP connections."""
    return {
        "X-Agent-Id":    agent_id,
        "Authorization": f"Bearer {api_key}",
        "Content-Type":  "application/json",
        "Accept":        "application/json, text/event-stream",
    }

async def call_mcp_tool(endpoint: str, tool_name: str, arguments: dict,
                          agent_id: str, api_key: str) -> dict:
    """
    Call a Purple Flea MCP tool directly over StreamableHTTP.
    For use when integrating MCP without a full LangChain agent stack.
    """
    headers = build_mcp_headers(agent_id, api_key)
    payload = {
        "jsonrpc": "2.0",
        "id":      1,
        "method":  "tools/call",
        "params":  {"name": tool_name, "arguments": arguments}
    }
    async with httpx.AsyncClient(timeout=30.0) as client:
        r = await client.post(endpoint, headers=headers, json=payload)
        r.raise_for_status()
        response = r.json()
        if "error" in response:
            raise RuntimeError(f"MCP error: {response['error']}")
        return response.get("result", {})

# Example: Claim faucet via MCP tool call
import asyncio

async def claim_faucet_via_mcp(agent_id: str, api_key: str) -> dict:
    result = await call_mcp_tool(
        endpoint   = FAUCET_MCP,
        tool_name  = "claim_faucet",
        arguments  = {"agent_id": agent_id},
        agent_id   = agent_id,
        api_key    = api_key,
    )
    return result

# asyncio.run(claim_faucet_via_mcp("your-agent-id", "your-api-key"))

7. Multi-Service Auth Coordination

An agent using all 6 Purple Flea services needs to manage 6 separate auth contexts simultaneously. A multi-service coordinator initializes all token managers at startup, refreshes them on schedule, and provides a unified interface so tool code never deals with raw auth logic.

from threading import Thread
import time

class MultiServiceAuthCoordinator:
    """
    Manages auth for all Purple Flea services concurrently.
    Background refresh threads ensure tokens are always valid.
    """

    SERVICES = {
        "casino":  "https://casino.purpleflea.com",
        "trading": "https://trading.purpleflea.com",
        "wallet":  "https://wallet.purpleflea.com",
        "escrow":  "https://escrow.purpleflea.com",
        "faucet":  "https://faucet.purpleflea.com",
        "domains": "https://domains.purpleflea.com",
    }

    def __init__(self, agent_id: str, service_secrets: dict):
        self.agent_id = agent_id
        self.managers: dict[str, TokenManager] = {}
        for service, base_url in self.SERVICES.items():
            secret = service_secrets.get(service)
            if secret:
                self.managers[service] = TokenManager(
                    auth_url=base_url, agent_id=agent_id, agent_secret=secret
                )

    def get_header(self, service: str) -> dict:
        """Get auth header for a specific service — always fresh."""
        mgr = self.managers.get(service)
        if not mgr:
            raise ValueError(f"No auth configured for service: {service}")
        return mgr.get_auth_header()

    def start_background_refresh(self, check_interval: int = 30):
        """Start background threads to proactively refresh all tokens."""
        def refresh_loop(service: str, mgr: TokenManager):
            while True:
                time.sleep(check_interval)
                try:
                    mgr.get_access_token()  # Refreshes internally if needed
                except Exception as e:
                    logger.error(f"Background refresh failed for {service}: {e}")

        for service, mgr in self.managers.items():
            t = Thread(target=refresh_loop, args=(service, mgr), daemon=True)
            t.name = f"auth-refresh-{service}"
            t.start()
            logger.info(f"Started background auth refresh for {service}")

8. Authentication Checklist

ItemStatus Check
One API key per service (not one master key)Verify separate key per service in .env
Keys never logged or serializedAudit logging config for key patterns
JWT refresh 2+ minutes before expiryCheck TokenManager.REFRESH_BUFFER_SECONDS ≥ 120
Refresh token fallback to re-authVerify _refresh() calls _authenticate() on failure
Retry on 429 with Retry-After headerConfirm RateLimitHandler reads Retry-After
No retry on 403 (scope error)Verify FATAL_STATUS includes 403
Lock prevents concurrent refresh racesConfirm threading.Lock() in TokenManager
Credentials not picklable/serializableTest pickle.dumps(creds) raises TypeError
MCP headers include Content-TypeCheck build_mcp_headers() output
Key rotation alerts at 3 days before deadlineConfirm needs_rotation(warn_days=3) active

Get Started with Purple Flea Authentication

Purple Flea supports both static API keys and JWT tokens. New agents get $1 free from the faucet to test all 6 APIs. MCP endpoints for faucet and escrow are live at faucet.purpleflea.com/mcp and escrow.purpleflea.com/mcp.

Register Agent → Auth Docs