Guide

Secrets Management for AI Agents: API Keys, Wallets, and Private Keys

How autonomous agents securely store and access sensitive credentials — environment variables, vault systems (HashiCorp Vault, AWS Secrets Manager), key rotation, and hardware security modules.

Purple Flea March 6, 2026 20 min read
Secret Types
5+
API, wallet, seed, JWT, DB
Zero Downtime
Rotation
With dual-key pattern
Vault Options
3
Env, HashiCorp, AWS

1. Threat Model for Agent Secrets

Before choosing a secrets management approach, you must understand who you are defending against and what an attacker can access. Autonomous agents have a distinct threat surface from traditional applications:

ThreatImpactMitigationPriority
Prompt injection exfiltrationFull key exposureNever echo secrets; sanitize LLM outputsCritical
Environment variable dumpAll env secretsUse vault, not env for prod secretsHigh
Log leakagePartial key exposureScrub logs; structured loggingHigh
Server breachAll in-memory secretsHSM/MPC for private keys, short TTLsHigh
Supply chainEnv var theftAudit dependencies; least privilegeMedium
Critical rule: Never store secrets in code, git repositories, Docker images, or log files. Even if the repository is private today, it may be public tomorrow, and git history is forever.

2. Environment Variable Patterns

Environment variables are the floor, not the ceiling, of secrets management. They are appropriate for development and simple deployments, but must be used carefully even there.

The right way to use environment variables

Load secrets from env vars once at startup, validate their presence, and never log them. Provide a clear error if a required secret is missing.

config.pyPython
import os
import re
from typing import Optional

class SecretsFromEnv:
    """Load and validate secrets from environment variables at startup."""

    REQUIRED = [
        "PURPLEFLEA_API_KEY",
        "WALLET_PRIVATE_KEY",
    ]
    OPTIONAL = {
        "TELEGRAM_TOKEN": None,
        "DATABASE_URL": "sqlite:///agent.db",
    }

    def __init__(self):
        self._secrets: dict = {}
        self._load()

    def _load(self):
        missing = []
        for key in self.REQUIRED:
            val = os.environ.get(key)
            if not val:
                missing.append(key)
            else:
                self._secrets[key] = val

        if missing:
            raise EnvironmentError(
                f"Missing required environment variables: {', '.join(missing)}\n"
                f"Set them in your .env file or deployment configuration."
            )

        for key, default in self.OPTIONAL.items():
            self._secrets[key] = os.environ.get(key, default)

        # Validate format of known secrets
        self._validate_api_key(self._secrets["PURPLEFLEA_API_KEY"])
        self._validate_private_key(self._secrets["WALLET_PRIVATE_KEY"])

    def _validate_api_key(self, key: str):
        # Expect format: pf_live_<32 hex chars>
        if not re.match(r'^pf_live_[a-f0-9]{32}$', key):
            raise ValueError("PURPLEFLEA_API_KEY has invalid format")

    def _validate_private_key(self, key: str):
        # Expect 64-char hex or 0x-prefixed 64-char hex
        k = key.lstrip("0x")
        if not re.match(r'^[a-fA-F0-9]{64}$', k):
            raise ValueError("WALLET_PRIVATE_KEY must be 32-byte hex")

    def get(self, key: str) -> Optional[str]:
        return self._secrets.get(key)

    def __repr__(self) -> str:
        # Safe repr: mask all values
        masked = {k: "***" for k in self._secrets}
        return f"SecretsFromEnv({masked})"
Tip: Use a .env.example file committed to git with placeholder values to document required variables, and a .env file in .gitignore for actual values. Never commit .env.

3. HashiCorp Vault Integration

HashiCorp Vault provides a production-grade secrets store with dynamic secrets, lease management, audit logging, and fine-grained access control. It is ideal for multi-agent deployments where different agents need different permission scopes.

Key Vault concepts for agents:

vault_client.pyPython
import aiohttp, os, time, logging
from typing import Optional, Any

class VaultClient:
    """Async HashiCorp Vault client using AppRole authentication."""

    def __init__(self, vault_addr: str, role_id: str, secret_id: str,
                 mount_path: str = "secret"):
        self.vault_addr = vault_addr.rstrip("/")
        self.role_id = role_id
        self.secret_id = secret_id
        self.mount_path = mount_path
        self._token: Optional[str] = None
        self._token_expires_at: float = 0
        self.log = logging.getLogger("vault")

    async def _authenticate(self):
        """Authenticate via AppRole, get a short-lived token."""
        async with aiohttp.ClientSession() as s:
            resp = await s.post(
                f"{self.vault_addr}/v1/auth/approle/login",
                json={"role_id": self.role_id, "secret_id": self.secret_id}
            )
            resp.raise_for_status()
            data = await resp.json()
            auth = data["auth"]
            self._token = auth["client_token"]
            # Set expiry 60s before actual TTL for safety buffer
            self._token_expires_at = time.time() + auth["lease_duration"] - 60
            self.log.info(f"Vault authenticated; token valid for ~{auth['lease_duration']}s")

    async def _ensure_token(self):
        if self._token is None or time.time() >= self._token_expires_at:
            await self._authenticate()

    async def get_secret(self, path: str) -> dict[str, Any]:
        """Fetch a KV v2 secret. path e.g. 'agents/my-agent'."""
        await self._ensure_token()
        async with aiohttp.ClientSession() as s:
            resp = await s.get(
                f"{self.vault_addr}/v1/{self.mount_path}/data/{path}",
                headers={"X-Vault-Token": self._token}
            )
            if resp.status == 404:
                raise KeyError(f"Secret not found: {path}")
            resp.raise_for_status()
            data = await resp.json()
            return data["data"]["data"]

    async def put_secret(self, path: str, data: dict):
        """Write/update a KV v2 secret."""
        await self._ensure_token()
        async with aiohttp.ClientSession() as s:
            resp = await s.post(
                f"{self.vault_addr}/v1/{self.mount_path}/data/{path}",
                headers={"X-Vault-Token": self._token},
                json={"data": data}
            )
            resp.raise_for_status()

    async def delete_secret_version(self, path: str, versions: list[int]):
        """Soft-delete specific versions of a secret."""
        await self._ensure_token()
        async with aiohttp.ClientSession() as s:
            resp = await s.post(
                f"{self.vault_addr}/v1/{self.mount_path}/delete/{path}",
                headers={"X-Vault-Token": self._token},
                json={"versions": versions}
            )
            resp.raise_for_status()

# Usage:
# vault = VaultClient(
#     vault_addr="https://vault.internal:8200",
#     role_id=os.environ["VAULT_ROLE_ID"],
#     secret_id=os.environ["VAULT_SECRET_ID"]
# )
# secrets = await vault.get_secret("agents/trading-bot-1")
# api_key = secrets["purpleflea_api_key"]

4. AWS Secrets Manager Integration

For agents deployed on AWS (EC2, ECS, Lambda), AWS Secrets Manager provides native integration via IAM roles — no vault token management needed. The EC2 instance or ECS task role grants access to specific secrets.

aws_secrets.pyPython
import boto3, json, logging
from functools import lru_cache
from typing import Any
import time

class AWSSecretsManager:
    """AWS Secrets Manager client with caching to minimize API calls."""

    def __init__(self, region: str = "us-east-1", cache_ttl: int = 300):
        self.client = boto3.client("secretsmanager", region_name=region)
        self.cache_ttl = cache_ttl
        self._cache: dict = {}   # secret_name -> (value, expires_at)
        self.log = logging.getLogger("aws_secrets")

    def get_secret(self, secret_name: str) -> dict[str, Any]:
        """Get a secret, using cache to avoid rate limits."""
        cached = self._cache.get(secret_name)
        if cached and time.time() < cached[1]:
            return cached[0]

        try:
            response = self.client.get_secret_value(SecretId=secret_name)
        except self.client.exceptions.ResourceNotFoundException:
            raise KeyError(f"Secret not found: {secret_name}")

        if "SecretString" in response:
            value = json.loads(response["SecretString"])
        else:
            import base64
            value = {"binary": base64.b64decode(response["SecretBinary"])}

        expires_at = time.time() + self.cache_ttl
        self._cache[secret_name] = (value, expires_at)
        self.log.info(f"Fetched secret '{secret_name}' from AWS (cached {self.cache_ttl}s)")
        return value

    def rotate_secret(self, secret_name: str):
        """Trigger immediate rotation (requires rotation Lambda configured)."""
        self.client.rotate_secret(SecretId=secret_name)
        # Invalidate cache
        self._cache.pop(secret_name, None)
        self.log.info(f"Triggered rotation for '{secret_name}'")

    def update_secret(self, secret_name: str, data: dict):
        """Update secret value. Cache is invalidated."""
        self.client.put_secret_value(
            SecretId=secret_name,
            SecretString=json.dumps(data)
        )
        self._cache.pop(secret_name, None)
        self.log.info(f"Updated secret '{secret_name}'")

# Usage with IAM role (no credentials needed on EC2/ECS):
# sm = AWSSecretsManager(region="us-east-1")
# secrets = sm.get_secret("purpleflea/trading-agent")
# api_key = secrets["api_key"]
Best practice: Use AWS Secrets Manager rotation with a Lambda function that calls the service's key rotation API and then updates the secret. The cache TTL in your client should be shorter than the rotation interval to ensure agents pick up new keys quickly.

5. Rotating API Keys Without Downtime

Key rotation is the process of replacing credentials before they expire or are compromised. The challenge for autonomous agents is continuity: the agent cannot stop while the key rotates. The solution is the dual-key pattern:

  1. Generate a new key while the old key is still valid
  2. Store both keys in the secret store
  3. Agent begins using the new key for new requests
  4. Wait for any in-flight requests using the old key to complete
  5. Revoke the old key
  6. Remove old key from secret store
key_rotation.pyPython
import asyncio, aiohttp, time, logging
from typing import Optional

class RotatingKeyManager:
    """Dual-key pattern for zero-downtime API key rotation."""

    def __init__(self, vault: VaultClient, secret_path: str,
                 purpleflea_api_base: str):
        self.vault = vault
        self.secret_path = secret_path
        self.api_base = purpleflea_api_base
        self.log = logging.getLogger("key_rotation")
        self._current_key: Optional[str] = None
        self._old_key: Optional[str] = None
        self._in_flight: int = 0   # count of requests using old key

    async def load_current_key(self):
        secrets = await self.vault.get_secret(self.secret_path)
        self._current_key = secrets["api_key"]
        self._old_key = secrets.get("api_key_old")

    async def rotate(self):
        """Perform a live rotation of the Purple Flea API key."""
        self.log.info("Starting key rotation...")

        # 1. Request new key from Purple Flea
        async with aiohttp.ClientSession() as s:
            resp = await s.post(
                f"{self.api_base}/api/keys/rotate",
                headers={"Authorization": f"Bearer {self._current_key}"}
            )
            resp.raise_for_status()
            data = await resp.json()
            new_key = data["new_key"]
            old_key = self._current_key

        # 2. Store both keys in vault (new = current, old = old)
        await self.vault.put_secret(self.secret_path, {
            "api_key": new_key,
            "api_key_old": old_key,
            "rotated_at": time.time()
        })
        self.log.info(f"Vault updated. New key: ...{new_key[-4:]}")

        # 3. Switch current; keep old for in-flight drain
        self._old_key = old_key
        self._current_key = new_key

        # 4. Wait for in-flight requests using old key to complete
        deadline = time.time() + 30
        while self._in_flight > 0 and time.time() < deadline:
            await asyncio.sleep(1)

        # 5. Revoke old key
        async with aiohttp.ClientSession() as s:
            await s.delete(
                f"{self.api_base}/api/keys/{old_key}",
                headers={"Authorization": f"Bearer {self._current_key}"}
            )

        # 6. Remove old key from vault
        await self.vault.put_secret(self.secret_path, {
            "api_key": new_key,
            "rotated_at": time.time()
        })
        self._old_key = None
        self.log.info("Rotation complete. Old key revoked.")

    def get_key(self) -> str:
        return self._current_key

6. Wallet Private Key Security: HSM and MPC

Private keys for crypto wallets are the most sensitive secrets an agent manages. If a private key is stolen, funds are irreversibly lost. Two enterprise approaches:

Hardware Security Modules (HSM)

An HSM is a physical device that generates and stores private keys in tamper-resistant hardware. The key never leaves the HSM; instead, you send data to sign and receive the signature. Providers: AWS CloudHSM, Azure Dedicated HSM, YubiHSM.

Multi-Party Computation (MPC)

MPC splits the private key into shards distributed across multiple parties. A transaction requires a threshold of parties to cooperate (e.g., 2-of-3). No single party or machine ever holds the full key. Providers: Fireblocks, Privy, Web3Auth.

wallet_signer.pyPython
from abc import ABC, abstractmethod
from eth_account import Account
from eth_account.messages import encode_defunct
import boto3, json

class WalletSigner(ABC):
    """Abstract wallet signer — swap implementations without changing agent code."""

    @abstractmethod
    async def sign_transaction(self, tx: dict) -> str:
        """Return signed transaction hex."""

    @abstractmethod
    async def get_address(self) -> str:
        """Return the wallet's public address."""

class EnvWalletSigner(WalletSigner):
    """Simple env var signer — development only."""

    def __init__(self, private_key: str):
        self._acct = Account.from_key(private_key)

    async def sign_transaction(self, tx: dict) -> str:
        signed = self._acct.sign_transaction(tx)
        return signed.rawTransaction.hex()

    async def get_address(self) -> str:
        return self._acct.address

class KMSWalletSigner(WalletSigner):
    """AWS KMS asymmetric key signer (secp256k1 ECC)."""

    def __init__(self, key_id: str, region: str = "us-east-1"):
        self.key_id = key_id
        self.kms = boto3.client("kms", region_name=region)
        self._address: str = None

    async def get_address(self) -> str:
        if self._address:
            return self._address
        # Fetch public key from KMS
        resp = self.kms.get_public_key(KeyId=self.key_id)
        # Parse DER-encoded public key to Ethereum address
        from eth_keys import keys
        from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
        from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
        from cryptography.hazmat.backends import default_backend
        from cryptography.hazmat.primitives.serialization import load_der_public_key
        pub_key_der = resp["PublicKey"]
        pub_key_obj = load_der_public_key(pub_key_der, backend=default_backend())
        pub_bytes = pub_key_obj.public_bytes(Encoding.X962,
                                              PublicFormat.UncompressedPoint)
        # pub_bytes[1:] is the 64-byte uncompressed point (skip 0x04 prefix)
        eth_key = keys.PublicKey(pub_bytes[1:])
        self._address = eth_key.to_checksum_address()
        return self._address

    async def sign_transaction(self, tx: dict) -> str:
        import hashlib
        # Encode transaction and get keccak256 hash
        from eth_account._utils.signing import sign_transaction_hash
        # ... KMS sign call returns DER-encoded signature
        # Full implementation requires DER -> (r,s) conversion + EIP-155 v computation
        raise NotImplementedError("Full KMS sign implementation omitted for brevity")
Recommendation for Purple Flea agents: For agents managing less than $10K, env var signing with a dedicated hot wallet is acceptable. Above $10K, use a 2-of-3 MPC solution. Never store a high-value wallet's private key on the same machine that runs the agent's LLM inference.

7. Purple Flea API Key Lifecycle

Purple Flea API keys follow a defined lifecycle. Understanding it helps you build agents that handle key transitions gracefully.

purpleflea_key_manager.pyPython
import aiohttp, time, logging
from typing import Optional

class PurpleFleatKeyManager:
    """Manages Purple Flea API key lifecycle: expiry detection + auto-rotation."""

    EXPIRY_WARNING_DAYS = 7

    def __init__(self, initial_key: str, secret_store, api_base: str):
        self.api_base = api_base
        self.secret_store = secret_store  # vault or AWS SM client
        self._key = initial_key
        self._key_expires_at: Optional[float] = None
        self.log = logging.getLogger("pf_keys")

    @property
    def key(self) -> str:
        return self._key

    async def check_and_rotate(self, session: aiohttp.ClientSession):
        """Call this after each API response to check expiry headers."""
        if self._key_expires_at is None:
            return

        days_left = (self._key_expires_at - time.time()) / 86400
        if days_left <= self.EXPIRY_WARNING_DAYS:
            self.log.warning(f"API key expires in {days_left:.1f} days — rotating")
            await self._do_rotate(session)

    async def handle_response_headers(self, headers: dict):
        """Parse X-Key-Expires-In from API response headers."""
        expires_in = headers.get("X-Key-Expires-In")
        if expires_in:
            self._key_expires_at = time.time() + int(expires_in)

    async def _do_rotate(self, session: aiohttp.ClientSession):
        resp = await session.post(
            f"{self.api_base}/api/keys/rotate",
            headers={"Authorization": f"Bearer {self._key}"}
        )
        if resp.status != 200:
            self.log.error(f"Key rotation failed: {resp.status}")
            return
        data = await resp.json()
        new_key = data["key"]
        old_key = self._key

        # Persist new key to secret store
        await self.secret_store.put_secret("agents/my-agent", {
            "api_key": new_key,
            "api_key_old": old_key,
            "rotated_at": time.time()
        })
        self._key = new_key
        self._key_expires_at = None
        self.log.info(f"API key rotated successfully. New key: ...{new_key[-4:]}")

8. SecretsManager Wrapper Class

In production agents, you need a unified interface that works across development (env vars), staging (HashiCorp Vault), and production (AWS Secrets Manager). The wrapper pattern provides this abstraction.

secrets_manager.pyPython
import os, logging
from enum import Enum
from typing import Any, Optional

class SecretBackend(Enum):
    ENV = "env"
    VAULT = "vault"
    AWS = "aws"

class SecretsManager:
    """
    Unified secrets manager. Backend is selected by SECRETS_BACKEND env var.
    Falls back to env vars in development.
    """

    def __init__(self):
        self.log = logging.getLogger("secrets")
        backend_name = os.environ.get("SECRETS_BACKEND", "env").lower()
        self.backend = SecretBackend(backend_name)
        self._impl = self._init_backend()
        self.log.info(f"Secrets backend: {self.backend.value}")

    def _init_backend(self):
        if self.backend == SecretBackend.ENV:
            return EnvSecretsBackend()
        elif self.backend == SecretBackend.VAULT:
            return VaultSecretsBackend(
                vault_addr=os.environ["VAULT_ADDR"],
                role_id=os.environ["VAULT_ROLE_ID"],
                secret_id=os.environ["VAULT_SECRET_ID"],
            )
        elif self.backend == SecretBackend.AWS:
            return AWSSecretsBackend(
                region=os.environ.get("AWS_REGION", "us-east-1")
            )

    async def get(self, key: str, namespace: str = "default") -> Optional[str]:
        """Get a single secret value by key and namespace."""
        try:
            return await self._impl.get(key, namespace)
        except Exception as e:
            self.log.error(f"Failed to get secret '{key}' from {self.backend.value}: {e}")
            raise

    async def set(self, key: str, value: str, namespace: str = "default"):
        """Store a secret value."""
        await self._impl.set(key, value, namespace)

    async def rotate(self, key: str, namespace: str = "default"):
        """Trigger rotation for a secret."""
        await self._impl.rotate(key, namespace)

class EnvSecretsBackend:
    async def get(self, key: str, namespace: str) -> Optional[str]:
        env_key = f"{namespace.upper()}_{key.upper()}".replace("-", "_")
        val = os.environ.get(env_key) or os.environ.get(key.upper())
        return val

    async def set(self, key: str, value: str, namespace: str):
        # Cannot set env vars persistently; log warning
        logging.getLogger("secrets").warning(
            f"EnvSecretsBackend.set() called for '{key}' — env vars are not persistent"
        )

    async def rotate(self, key: str, namespace: str):
        raise NotImplementedError("Use vault or AWS backend for rotation")

# In your agent:
# secrets = SecretsManager()
# api_key = await secrets.get("API_KEY", namespace="purpleflea")
# private_key = await secrets.get("WALLET_PRIVATE_KEY", namespace="wallet")
Security principle: Apply least-privilege to secrets. A trading agent only needs its own API key and wallet key — not database admin passwords or other agents' keys. Use Vault policies or AWS IAM to enforce this boundary programmatically.

Secure your agent with Purple Flea

Purple Flea provides API keys, wallet infrastructure, escrow, and a full financial stack for autonomous agents. Start with the free faucet — no risk, no commitment.

Get Started Free