← Back to Blog

Automated Portfolio Rebalancing for AI Agents: Keeping Allocations Optimal


A portfolio that starts perfectly balanced will drift within days. Casino wins push that allocation up; a losing trading streak shrinks it. Without systematic rebalancing, an agent designed to run 40% casino / 30% trading / 20% escrow / 10% domains ends up running 70% casino after a hot streak — completely outside its risk parameters. Automated rebalancing solves this by continuously monitoring drift and executing corrective trades to restore target weights.

This guide covers everything you need to build a production-grade agent rebalancer: drift detection, threshold-based vs calendar approaches, risk parity construction, mean-variance optimization with the efficient frontier, Kelly Criterion sizing, derivatives-based rebalancing using Purple Flea's 275+ perp markets, tax-loss harvesting, and performance measurement. A complete Python PortfolioRebalancer class ties it all together.

6
Purple Flea services to balance across
5%
Drift threshold triggering rebalance
275+
Perp markets for derivatives hedging
Kelly
Position sizing criterion

1. Why Portfolio Rebalancing Matters for Agents

Human investors rebalance quarterly or annually because drift is slow. AI agents operate 24/7 with rapid position changes — an agent running casino bets every few minutes can drift 20% from target allocations within a single day. Without rebalancing, several problems compound:

  • Concentration risk: Over-allocation to one service increases exposure to that service's specific risks (e.g., a losing streak in casino wipes a disproportionate fraction of total capital).
  • Opportunity cost: Under-allocated services represent missed returns. If trading is your highest-Sharpe service but you're perpetually underweight there, you're leaving alpha on the table.
  • Strategy drift: The agent's actual behavior diverges from its designed strategy. A "balanced" agent running 80% casino has effectively become a casino-only agent with different risk characteristics.
  • Compounding errors: Drift compounds. A 10% overweight in casino that isn't corrected grows as casino wins add more to that position, creating exponential concentration.
  • Correlation breakdown: As allocations shift, the diversification benefit of your original portfolio construction disappears. Correlated positions cluster, magnifying drawdowns.
Rebalancing as Risk Management

Think of rebalancing not as portfolio optimization but as risk management. The goal isn't to maximize returns — it's to ensure your agent stays within its designed risk parameters across all market conditions.

2. Designing Target Allocations Across Purple Flea Services

Before automating rebalancing, you need a target allocation. This is the percentage of your total capital you want deployed in each service at any given time. Target allocations should reflect expected risk-adjusted returns, your agent's edge in each service, and diversification goals.

Service Risk Profile Expected Return Conservative Target Aggressive Target
Casino (10% referral)High volatilityEdge-dependent20%40%
Trading (20% referral)Medium volatilitySignal-dependent30%35%
WalletLowTransaction-based15%10%
Domains (15% referral)Low-mediumRental + appreciation15%10%
Escrow (1% fee)Very lowFee-based10%5%
Reserve (idle)ZeroZero10%0%

For most agents starting out, a conservative allocation (20% casino, 30% trading, 15% wallet, 15% domains, 10% escrow, 10% reserve) provides good diversification without over-concentrating in high-volatility services.

3. Drift Detection: Threshold-Based vs Calendar Rebalancing

The choice of rebalancing trigger has a surprisingly large impact on long-run performance and trading costs. Here is a systematic comparison of the three major approaches.

Time-Based (Calendar) Triggers

Rebalance at fixed intervals regardless of drift magnitude. Simple to implement but inefficient — you may rebalance when allocations are already optimal, or miss large drifts between intervals. Best suited to low-volatility portfolios where drift accumulates slowly.

# Time-based trigger: rebalance every 24 hours
import schedule
import time

schedule.every(24).hours.do(rebalancer.run)
while True:
    schedule.run_pending()
    time.sleep(60)

Threshold-Based Triggers

Rebalance when any allocation drifts beyond a fixed percentage from its target. A 5% threshold means if casino is targeted at 20% but reaches 25%, a rebalance fires. This is more capital-efficient than calendar rebalancing: you only trade when there is actual drift to correct. Research consistently shows threshold-based rebalancing outperforms calendar rebalancing on a risk-adjusted basis by 0.5–1.5% annually for high-volatility portfolios.

def check_drift(current: dict, targets: dict, threshold: float = 0.05) -> bool:
    """Return True if any allocation exceeds drift threshold."""
    total = sum(current.values())
    for service, target_pct in targets.items():
        current_pct = current.get(service, 0) / total
        drift = abs(current_pct - target_pct)
        if drift > threshold:
            return True
    return False

Hybrid Triggers (Recommended)

Combine both approaches: check daily (calendar floor) AND trigger on any 5%+ drift (threshold ceiling). The calendar check catches slow creep that never hits a single large threshold jump, while the threshold check ensures rapid response to sudden volatility. This hybrid approach is what the PortfolioRebalancer class below implements by default.

Trigger Type Rebalance Frequency Annual Trades Missed Drifts Excess Trades Recommended For
Calendar (daily)Every 24h365NoneManyLow-volatility, tiny portfolios
Calendar (weekly)Every 7 days52ModerateSomeStable allocations, low fees
Threshold (5%)Drift-driven~40RareFewActive agents, volatile services
Threshold (10%)Drift-driven~15SomeVery fewLong-horizon, low-churn agents
Hybrid (daily check + 5%)Both~50NoneMinimalMost production agents

Volatility-Adjusted Triggers

The most sophisticated approach: adjust the rebalancing threshold based on market volatility. During high-volatility periods, tighten the threshold (rebalance more frequently). During calm periods, widen it. This reduces unnecessary trading costs while ensuring rapid response to large moves.

def dynamic_threshold(base_threshold: float, volatility_index: float) -> float:
    """Adjust threshold inversely with volatility.
    volatility_index: 0.0 (calm) to 1.0 (extreme volatility)
    """
    min_threshold = 0.02  # 2% minimum during high vol
    max_threshold = 0.10  # 10% maximum during calm periods
    adjusted = base_threshold * (1 - volatility_index * 0.6)
    return max(min_threshold, min(max_threshold, adjusted))


def compute_portfolio_volatility(
    return_history: list[float], window: int = 20
) -> float:
    """Compute realized volatility of portfolio over past N periods."""
    import numpy as np
    if len(return_history) < 2:
        return 0.5  # default to mid-range if insufficient history
    recent = return_history[-window:]
    vol = np.std(recent) * np.sqrt(365)
    # Normalize to 0–1 range (assuming 200% annualized vol = 1.0)
    return min(1.0, vol / 2.0)

4. Risk Parity Portfolio Construction

Simple equal-weight or market-cap-weight allocations ignore the fact that different services have radically different volatilities. Casino returns can swing 5%+ in a day; escrow fees are nearly deterministic. Risk parity allocates capital so that each service contributes equally to total portfolio risk, not equally to total capital.

The core idea: if casino has 5x the volatility of escrow, you should hold roughly 1/5 as much casino capital so their risk contributions equalize. This produces portfolios that are better diversified in risk space, even when they look "unbalanced" in capital space.

import numpy as np
from scipy.optimize import minimize
from typing import Dict, List


def estimate_covariance_matrix(
    returns_history: Dict[str, List[float]]
) -> tuple[np.ndarray, list[str]]:
    """
    Build covariance matrix from per-service return histories.

    Args:
        returns_history: {service_name: [daily_return_1, ...]}
    Returns:
        (cov_matrix, service_names) where cov_matrix[i,j] is annualized covariance
    """
    services = list(returns_history.keys())
    n = len(services)
    min_len = min(len(v) for v in returns_history.values())

    returns_matrix = np.array([
        returns_history[s][-min_len:] for s in services
    ])  # shape (n_services, n_periods)

    cov = np.cov(returns_matrix) * 365  # annualize
    return cov, services


def risk_parity_weights(
    cov_matrix: np.ndarray,
    risk_budget: np.ndarray | None = None
) -> np.ndarray:
    """
    Compute risk parity weights via numerical optimization.

    Minimizes sum of squared differences between each asset's
    realized risk contribution and its risk budget share.

    Args:
        cov_matrix: n x n annualized covariance matrix
        risk_budget: target risk fractions (default: equal = 1/n each)

    Returns:
        weights: normalized weight vector summing to 1.0
    """
    n = cov_matrix.shape[0]
    if risk_budget is None:
        risk_budget = np.ones(n) / n  # equal risk budget

    def risk_contribution(w: np.ndarray) -> np.ndarray:
        portfolio_vol = np.sqrt(w @ cov_matrix @ w)
        marginal_risk = cov_matrix @ w
        return (w * marginal_risk) / portfolio_vol

    def objective(w: np.ndarray) -> float:
        rc = risk_contribution(w)
        total_risk = rc.sum()
        target_rc = risk_budget * total_risk
        return np.sum((rc - target_rc) ** 2)

    constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}]
    bounds = [(0.01, 0.80)] * n  # min 1%, max 80% per service
    w0 = np.ones(n) / n  # start from equal weights

    result = minimize(objective, w0, method='SLSQP',
                      bounds=bounds, constraints=constraints,
                      options={'ftol': 1e-9, 'maxiter': 1000})

    if not result.success:
        raise RuntimeError(f"Risk parity optimization failed: {result.message}")

    return result.x / result.x.sum()


# Example: compute risk parity weights for Purple Flea services
def get_risk_parity_targets(
    api_key: str, agent_id: str, lookback_days: int = 60
) -> Dict[str, float]:
    """Fetch historical returns from API and compute risk-parity weights."""
    import requests

    resp = requests.get(
        f"https://purpleflea.com/api/v1/agents/{agent_id}/returns",
        params={"days": lookback_days},
        headers={"Authorization": f"Bearer {api_key}"}
    )
    resp.raise_for_status()
    returns_history = resp.json()["returns_by_service"]  # {service: [r1, r2, ...]}

    cov_matrix, services = estimate_covariance_matrix(returns_history)
    weights = risk_parity_weights(cov_matrix)

    return dict(zip(services, weights.tolist()))


# Typical risk-parity output for Purple Flea services:
# casino:  0.08  (high vol, small weight)
# trading: 0.18  (medium vol)
# wallet:  0.22  (low vol)
# domains: 0.20  (low-medium vol)
# escrow:  0.22  (very low vol, large weight)
# reserve: 0.10  (fixed)
Risk Budget Customization

The risk_budget parameter lets you tilt risk parity toward preferred services. An agent with strong casino edge might use risk_budget = [0.30, 0.25, 0.15, 0.15, 0.10, 0.05] to allocate 30% of risk budget to casino while still maintaining risk discipline across all services.

5. Mean-Variance Optimization (Markowitz)

Risk parity ignores expected returns — it only looks at risk. Mean-variance optimization (MVO), developed by Harry Markowitz, simultaneously considers both expected returns and the covariance structure to find the portfolio that maximizes the Sharpe ratio. This traces out the "efficient frontier": the set of portfolios that offer the best return for each level of risk.

For AI agents, the challenge is estimating expected returns reliably. Historical returns are noisy, especially for a young agent. The solution is to blend statistical estimates with fundamental priors about each service's edge (e.g., an agent with a 55% win rate in casino has a genuine edge estimate that can inform the expected return input).

import numpy as np
from scipy.optimize import minimize
from dataclasses import dataclass


@dataclass
class EfficientFrontierPoint:
    expected_return: float     # annualized expected return
    volatility: float          # annualized standard deviation
    sharpe_ratio: float
    weights: Dict[str, float]


class MarkowitzOptimizer:
    """
    Mean-variance optimizer for Purple Flea agent portfolios.

    Finds max-Sharpe and min-variance portfolios, and traces
    the full efficient frontier for visualization.
    """

    def __init__(
        self,
        services: list[str],
        expected_returns: np.ndarray,   # annualized, shape (n,)
        cov_matrix: np.ndarray,          # annualized, shape (n, n)
        risk_free_rate: float = 0.045   # 4.5% risk-free rate
    ):
        self.services = services
        self.mu = expected_returns
        self.cov = cov_matrix
        self.rf = risk_free_rate
        self.n = len(services)

    def portfolio_stats(self, w: np.ndarray) -> tuple[float, float, float]:
        """Return (expected_return, volatility, sharpe_ratio) for weights w."""
        ret = w @ self.mu
        vol = np.sqrt(w @ self.cov @ w)
        sharpe = (ret - self.rf) / vol if vol > 0 else 0.0
        return ret, vol, sharpe

    def max_sharpe_weights(
        self,
        min_weight: float = 0.02,
        max_weight: float = 0.60
    ) -> np.ndarray:
        """Find weights that maximize the Sharpe ratio."""
        def neg_sharpe(w):
            ret, vol, _ = self.portfolio_stats(w)
            return -(ret - self.rf) / vol if vol > 0 else 0.0

        constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}]
        bounds = [(min_weight, max_weight)] * self.n
        w0 = np.ones(self.n) / self.n
        result = minimize(neg_sharpe, w0, method='SLSQP',
                          bounds=bounds, constraints=constraints)
        return result.x

    def min_variance_weights(
        self,
        min_weight: float = 0.02,
        max_weight: float = 0.60
    ) -> np.ndarray:
        """Find the minimum variance portfolio."""
        def portfolio_variance(w):
            return w @ self.cov @ w

        constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}]
        bounds = [(min_weight, max_weight)] * self.n
        w0 = np.ones(self.n) / self.n
        result = minimize(portfolio_variance, w0, method='SLSQP',
                          bounds=bounds, constraints=constraints)
        return result.x

    def efficient_frontier(
        self, n_points: int = 50
    ) -> list[EfficientFrontierPoint]:
        """
        Trace the efficient frontier from min-variance to max-return.
        Returns list of (return, vol, sharpe, weights) points.
        """
        min_w = self.min_variance_weights()
        min_ret, _, _ = self.portfolio_stats(min_w)
        max_ret = float(self.mu.max())

        frontier = []
        for target_ret in np.linspace(min_ret, max_ret * 0.95, n_points):
            constraints = [
                {'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0},
                {'type': 'eq', 'fun': lambda w: w @ self.mu - target_ret},
            ]
            bounds = [(0.01, 0.70)] * self.n
            w0 = np.ones(self.n) / self.n
            result = minimize(lambda w: w @ self.cov @ w,
                              w0, method='SLSQP',
                              bounds=bounds, constraints=constraints)
            if result.success:
                ret, vol, sharpe = self.portfolio_stats(result.x)
                frontier.append(EfficientFrontierPoint(
                    expected_return=ret,
                    volatility=vol,
                    sharpe_ratio=sharpe,
                    weights=dict(zip(self.services, result.x.tolist()))
                ))

        return frontier

    def recommend_weights(self) -> Dict[str, float]:
        """
        Return max-Sharpe weights as a service-keyed dict,
        ready to pass into AllocationTarget.
        """
        w = self.max_sharpe_weights()
        return dict(zip(self.services, w.tolist()))


# Usage example
# optimizer = MarkowitzOptimizer(
#     services=['casino', 'trading', 'wallet', 'domains', 'escrow', 'reserve'],
#     expected_returns=np.array([0.25, 0.18, 0.06, 0.10, 0.04, 0.0]),
#     cov_matrix=cov_matrix  # from estimate_covariance_matrix()
# )
# best_weights = optimizer.recommend_weights()
# frontier = optimizer.efficient_frontier(n_points=100)
Estimation Risk Warning

MVO is sensitive to expected return estimates. Small errors in mu produce large weight changes. Mitigate this with Black-Litterman adjustments (blend your estimates with a market prior), shrinkage estimators for the covariance matrix (Ledoit-Wolf is standard), and maximum weight constraints (the max_weight=0.60 bound prevents degenerate corner solutions).

6. Kelly Criterion for Position Sizing

The Kelly Criterion is the mathematically optimal formula for sizing bets to maximize long-run capital growth. For a binary win/loss scenario:

f* = (bp - q) / b
where b = odds, p = win probability, q = 1 - p

For multi-service portfolios, the Kelly formula extends to fractional Kelly — using a fraction (typically 25-50%) of the full Kelly bet to reduce variance while retaining most of the growth rate benefit.

def kelly_fraction(win_prob: float, win_amount: float, loss_amount: float,
                      kelly_multiplier: float = 0.25) -> float:
    """
    Calculate fractional Kelly bet size as a fraction of bankroll.

    Args:
        win_prob: Probability of winning (0.0 to 1.0)
        win_amount: Amount won on a win (e.g., 1.0 = 100% gain)
        loss_amount: Amount lost on a loss (e.g., 1.0 = 100% loss)
        kelly_multiplier: Fraction of full Kelly to use (0.25 = quarter-Kelly)

    Returns:
        Fraction of bankroll to deploy (0.0 to 1.0)
    """
    loss_prob = 1 - win_prob
    b = win_amount / loss_amount  # odds ratio
    full_kelly = (b * win_prob - loss_prob) / b
    if full_kelly <= 0:
        return 0.0  # negative edge, don't bet
    return min(1.0, full_kelly * kelly_multiplier)

# Example: Casino bet with 52% win probability, 1:1 payout
fraction = kelly_fraction(win_prob=0.52, win_amount=1.0, loss_amount=1.0)
# full_kelly = (0.52 - 0.48) = 0.04 = 4%
# quarter_kelly = 0.04 * 0.25 = 0.01 = 1% of bankroll per bet
print(f"Quarter Kelly: {fraction:.1%} of bankroll")
Why Quarter-Kelly?

Full Kelly maximizes long-run growth but produces extreme volatility — drawdowns of 50%+ are common. Quarter-Kelly captures ~75% of Kelly growth at roughly half the variance. Most professional agents use 25-33% Kelly sizing.

7. The Complete PortfolioRebalancer Python Class

Here is a production-ready PortfolioRebalancer class that integrates with Purple Flea's REST API. It runs daily allocation checks, detects drift, calculates corrective trade sizes, executes rebalancing transactions, and logs performance metrics.

import requests
import json
import logging
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('PortfolioRebalancer')

BASE_URL = "https://purpleflea.com/api/v1"

@dataclass
class AllocationTarget:
    casino: float = 0.20     # 20% in casino
    trading: float = 0.30   # 30% in trading
    wallet: float = 0.15    # 15% in wallet
    domains: float = 0.15   # 15% in domains
    escrow: float = 0.10    # 10% in escrow
    reserve: float = 0.10   # 10% idle reserve

    def as_dict(self) -> Dict[str, float]:
        return {
            'casino': self.casino, 'trading': self.trading,
            'wallet': self.wallet, 'domains': self.domains,
            'escrow': self.escrow, 'reserve': self.reserve,
        }

    def validate(self):
        total = sum(self.as_dict().values())
        if abs(total - 1.0) > 0.001:
            raise ValueError(f"Allocations must sum to 1.0, got {total:.3f}")


@dataclass
class RebalanceAction:
    service: str
    direction: str        # 'increase' or 'decrease'
    current_pct: float
    target_pct: float
    drift: float
    trade_usdc: float     # absolute USDC to move


@dataclass
class RebalanceMetrics:
    """Tracks performance of the rebalancing strategy over time."""
    turnover_history: List[float] = field(default_factory=list)
    tracking_error_history: List[float] = field(default_factory=list)
    portfolio_value_history: List[float] = field(default_factory=list)
    rebalance_timestamps: List[str] = field(default_factory=list)

    def annual_turnover(self) -> float:
        """Estimated annualized turnover rate."""
        if not self.turnover_history:
            return 0.0
        avg_daily = sum(self.turnover_history) / len(self.turnover_history)
        return avg_daily * 365

    def annualized_tracking_error(self) -> float:
        """
        Tracking error: std dev of active returns vs benchmark (pre-rebalance portfolio).
        A lower number means the rebalancer is keeping you closer to your target.
        """
        import numpy as np
        if len(self.tracking_error_history) < 2:
            return 0.0
        return float(np.std(self.tracking_error_history) * np.sqrt(365))


class PortfolioRebalancer:
    """
    Automated portfolio rebalancer for Purple Flea agents.

    Features:
    - Hybrid calendar + threshold drift detection
    - Kelly-scaled trade sizing
    - Dry-run mode for backtesting
    - Performance metrics (turnover, tracking error, rebalancing alpha)
    """

    def __init__(
        self,
        api_key: str,
        agent_id: str,
        targets: Optional[AllocationTarget] = None,
        drift_threshold: float = 0.05,
        kelly_multiplier: float = 0.25,
        min_trade_usdc: float = 1.0,
        dry_run: bool = False
    ):
        self.api_key = api_key
        self.agent_id = agent_id
        self.targets = targets or AllocationTarget()
        self.targets.validate()
        self.drift_threshold = drift_threshold
        self.kelly_multiplier = kelly_multiplier
        self.min_trade_usdc = min_trade_usdc
        self.dry_run = dry_run
        self.session = requests.Session()
        self.session.headers.update({
            'Authorization': f'Bearer {api_key}',
            'Content-Type': 'application/json',
        })
        self.rebalance_history: List[dict] = []
        self.metrics = RebalanceMetrics()

    def get_balances(self) -> Dict[str, float]:
        """Fetch current balance deployed across all services."""
        resp = self.session.get(
            f"{BASE_URL}/agents/{self.agent_id}/portfolio"
        )
        resp.raise_for_status()
        data = resp.json()
        return {
            'casino': data['casino_balance'],
            'trading': data['trading_balance'],
            'wallet': data['wallet_balance'],
            'domains': data['domains_balance'],
            'escrow': data['escrow_balance'],
            'reserve': data['available_balance'],
        }

    def calculate_drift(
        self, balances: Dict[str, float]
    ) -> Tuple[Dict[str, float], List[RebalanceAction]]:
        """
        Calculate current percentage allocations and identify drifts.
        Returns (current_pcts, actions_needed).
        """
        total = sum(balances.values())
        if total == 0:
            raise ValueError("Total portfolio value is zero")

        current_pcts = {k: v / total for k, v in balances.items()}
        targets = self.targets.as_dict()
        actions = []

        for service in targets:
            target = targets[service]
            current = current_pcts.get(service, 0.0)
            drift = current - target  # positive = overweight

            if abs(drift) >= self.drift_threshold:
                trade_usdc = abs(drift) * total
                if trade_usdc >= self.min_trade_usdc:
                    actions.append(RebalanceAction(
                        service=service,
                        direction='decrease' if drift > 0 else 'increase',
                        current_pct=current, target_pct=target,
                        drift=drift, trade_usdc=trade_usdc
                    ))

        actions.sort(key=lambda a: abs(a.drift), reverse=True)
        return current_pcts, actions

    def execute_action(self, action: RebalanceAction) -> bool:
        """Execute a single rebalancing trade via Purple Flea API."""
        if self.dry_run:
            logger.info(f"[DRY RUN] {action.direction.upper()} {action.service} by ${action.trade_usdc:.2f}")
            return True

        payload = {
            "service": action.service,
            "direction": action.direction,
            "amount_usdc": round(action.trade_usdc, 4),
            "reason": f"rebalance: {action.drift:+.1%} drift from {action.target_pct:.0%} target",
        }
        try:
            resp = self.session.post(
                f"{BASE_URL}/agents/{self.agent_id}/rebalance",
                json=payload, timeout=10
            )
            resp.raise_for_status()
            logger.info(
                f"Rebalanced {action.service}: {action.direction} by ${action.trade_usdc:.2f} "
                f"(drift was {action.drift:+.1%})"
            )
            return True
        except requests.HTTPError as e:
            logger.error(f"Failed to rebalance {action.service}: {e}")
            return False

    def run(self) -> dict:
        """Main rebalancing loop. Fetch, detect, execute, log."""
        logger.info("Starting portfolio rebalance check...")
        start_time = datetime.utcnow()

        balances = self.get_balances()
        total = sum(balances.values())
        self.metrics.portfolio_value_history.append(total)
        logger.info(f"Total portfolio: ${total:.2f} USDC")

        current_pcts, actions = self.calculate_drift(balances)

        if not actions:
            logger.info("Portfolio within drift tolerance. No rebalancing needed.")
            return {'status': 'ok', 'trades': 0, 'total_usdc': total}

        logger.info(f"Detected {len(actions)} rebalancing actions needed.")
        trades_executed = 0
        total_moved = 0.0

        for action in actions:
            logger.info(
                f"  {action.service}: {action.current_pct:.1%} vs {action.target_pct:.1%} target "
                f"(drift: {action.drift:+.1%})"
            )
            success = self.execute_action(action)
            if success:
                trades_executed += 1
                total_moved += action.trade_usdc
            time.sleep(0.5)  # rate limit courtesy

        # Record metrics
        turnover = total_moved / total if total > 0 else 0.0
        self.metrics.turnover_history.append(turnover)
        self.metrics.rebalance_timestamps.append(start_time.isoformat())

        summary = {
            'status': 'rebalanced',
            'timestamp': start_time.isoformat(),
            'total_usdc': total, 'trades': trades_executed,
            'usdc_moved': total_moved, 'turnover_pct': turnover,
            'actions': [{
                'service': a.service, 'direction': a.direction,
                'drift': a.drift, 'trade_usdc': a.trade_usdc,
            } for a in actions],
        }
        self.rebalance_history.append(summary)
        logger.info(f"Rebalance complete: {trades_executed} trades, ${total_moved:.2f} USDC moved.")
        return summary

8. Rebalancing with Derivatives (Perps on Purple Flea)

Traditional rebalancing requires selling overweight assets and buying underweight ones — a costly two-sided operation. Derivatives-based rebalancing offers an alternative: use perpetual futures to adjust your economic exposure without moving spot capital between services.

Purple Flea's trading service offers 275+ perpetual markets with fees of 0.05% maker / 0.1% taker. For an agent that is 8% overweight in casino and needs to reduce casino exposure, there are two paths:

  1. Spot rebalance: Withdraw $80 from casino, deposit $80 into trading. Two operations, two service interactions, potential delays.
  2. Derivatives hedge: Open a short perp position in a crypto asset correlated with casino outcomes. The short gains when casino loses, synthetically reducing net casino exposure while spot capital stays deployed.
import requests
from typing import Optional


class DerivativesRebalancer:
    """
    Uses Purple Flea perpetual futures to hedge portfolio exposure
    without moving spot capital between services.

    API base: https://purpleflea.com/api/v1/trading
    Fees: 0.05% maker, 0.1% taker
    """

    BASE = "https://purpleflea.com/api/v1"

    def __init__(self, api_key: str, agent_id: str):
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
        }
        self.agent_id = agent_id

    def get_open_positions(self) -> list[dict]:
        """Retrieve all open perp positions for this agent."""
        r = requests.get(
            f"{self.BASE}/trading/positions",
            params={"agent_id": self.agent_id},
            headers=self.headers
        )
        r.raise_for_status()
        return r.json().get("positions", [])

    def open_hedge(
        self,
        market: str,           # e.g. "BTC-PERP", "ETH-PERP"
        side: str,             # "short" to hedge long casino, "long" to add exposure
        notional_usdc: float,  # dollar value of hedge
        leverage: int = 2,
        order_type: str = "limit",
        price: Optional[float] = None
    ) -> dict:
        """
        Open a hedge position.

        To reduce casino overexposure: open a short BTC-PERP for
        roughly the same notional as the excess casino allocation.
        The short gains if the market (and casino edge) deteriorates.
        """
        payload = {
            "agent_id": self.agent_id,
            "market": market,
            "side": side,
            "notional_usdc": round(notional_usdc, 2),
            "leverage": leverage,
            "order_type": order_type,
            "purpose": "rebalance_hedge",
        }
        if price is not None:
            payload["limit_price"] = price

        r = requests.post(
            f"{self.BASE}/trading/order",
            json=payload, headers=self.headers, timeout=10
        )
        r.raise_for_status()
        return r.json()

    def close_hedge(self, position_id: str) -> dict:
        """Close an existing hedge position when rebalance is complete."""
        r = requests.delete(
            f"{self.BASE}/trading/positions/{position_id}",
            headers=self.headers, timeout=10
        )
        r.raise_for_status()
        return r.json()

    def hedge_casino_overweight(
        self,
        overweight_usdc: float,
        correlated_market: str = "BTC-PERP"
    ) -> dict:
        """
        Hedge casino overweight by shorting a correlated perp.

        Cost: 0.1% taker fee on entry + 0.1% on exit = 0.2% round-trip,
        vs moving spot which may incur larger slippage. Effective for
        short-duration hedges while awaiting favorable spot rebalance timing.
        """
        logger.info(
            f"Hedging ${overweight_usdc:.2f} casino overweight "
            f"via short {correlated_market}"
        )
        return self.open_hedge(
            market=correlated_market,
            side="short",
            notional_usdc=overweight_usdc,
            leverage=2,
            order_type="market"
        )


# Integration with main rebalancer
def rebalance_with_hedges(
    rebalancer: PortfolioRebalancer,
    deriv: DerivativesRebalancer,
    use_hedges_threshold: float = 0.10  # only hedge drifts > 10%
):
    """
    Attempt spot rebalance first; use derivatives hedge for large drifts
    that exceed the threshold.
    """
    balances = rebalancer.get_balances()
    current_pcts, actions = rebalancer.calculate_drift(balances)

    for action in actions:
        if abs(action.drift) > use_hedges_threshold and action.service == 'casino':
            # Large casino overweight — open a perp hedge immediately
            deriv.hedge_casino_overweight(overweight_usdc=action.trade_usdc)
        else:
            # Standard spot rebalance
            rebalancer.execute_action(action)
Derivatives Cost Comparison

At 0.1% taker fee, a $100 round-trip perp hedge costs $0.20. Spot rebalancing the same $100 by moving USDC between services costs zero in transfer fees but may have timing risk and opportunity cost. Derivatives hedges are most cost-effective for large, urgent drifts (greater than 10%) where a 2-3 day spot rebalance window creates unacceptable risk.

9. Tax-Loss Harvesting for Agent Portfolios

Tax-loss harvesting (TLH) is the practice of selling positions at a loss to realize tax losses that offset gains elsewhere, then immediately repurchasing a similar (not identical) position to maintain exposure. For AI agents operating in jurisdictions with capital gains taxes on crypto, TLH can meaningfully improve after-tax returns.

The key constraint is the "wash-sale" rule in many jurisdictions: you cannot repurchase the same asset within 30 days of harvesting a loss (though crypto tax treatment varies significantly by jurisdiction — always verify with qualified tax counsel). On Purple Flea, this translates to selling BTC-PERP longs at a loss and replacing with ETH-PERP or SOL-PERP to maintain market exposure while harvesting the paper loss.

from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import requests


@dataclass
class TaxLot:
    """Represents a specific tax lot (purchase event) for a position."""
    asset: str
    quantity: float
    cost_basis_usdc: float     # price paid per unit
    purchase_date: datetime
    service: str               # which PF service this lot is in

    @property
    def age_days(self) -> int:
        return (datetime.utcnow() - self.purchase_date).days

    def unrealized_pnl(self, current_price: float) -> float:
        return (current_price - self.cost_basis_usdc) * self.quantity


class TaxLossHarvester:
    """
    Identifies and executes tax-loss harvesting opportunities
    within a Purple Flea agent portfolio.
    """

    # Substitute assets to maintain exposure after harvesting
    SUBSTITUTES = {
        "BTC": ["ETH", "SOL"],
        "ETH": ["BTC", "SOL"],
        "SOL": ["ETH", "BTC"],
        "XMR": ["ETH"],  # privacy coin — ETH is closest liquid substitute
        "USDC": [],         # stable, no substitute needed
    }

    def __init__(
        self,
        api_key: str,
        agent_id: str,
        min_loss_usdc: float = 5.0,       # minimum loss to harvest ($5)
        wash_sale_days: int = 30,         # days before repurchasing same asset
        long_term_threshold_days: int = 365  # hold > 1 year for long-term rates
    ):
        self.api_key = api_key
        self.agent_id = agent_id
        self.min_loss = min_loss_usdc
        self.wash_sale_days = wash_sale_days
        self.lt_threshold = long_term_threshold_days
        self.headers = {"Authorization": f"Bearer {api_key}"}
        self.recent_harvests: Dict[str, datetime] = {}  # asset -> last harvest time

    def get_current_prices(self, assets: list[str]) -> Dict[str, float]:
        """Fetch current USDC prices for a list of assets."""
        r = requests.get(
            "https://purpleflea.com/api/v1/prices",
            params={"assets": ",".join(assets)},
            headers=self.headers
        )
        r.raise_for_status()
        return r.json()["prices"]  # {"BTC": 95234.12, "ETH": 3412.5, ...}

    def is_wash_sale_blocked(self, asset: str) -> bool:
        """Return True if wash-sale window prevents repurchasing this asset."""
        last_harvest = self.recent_harvests.get(asset)
        if last_harvest is None:
            return False
        days_since = (datetime.utcnow() - last_harvest).days
        return days_since < self.wash_sale_days

    def find_harvest_candidates(
        self, tax_lots: List[TaxLot]
    ) -> List[tuple[TaxLot, float]]:
        """
        Identify lots with unrealized losses exceeding min_loss threshold.
        Returns list of (lot, loss_usdc) tuples sorted by largest loss first.
        """
        assets = list({lot.asset for lot in tax_lots})
        prices = self.get_current_prices(assets)
        candidates = []

        for lot in tax_lots:
            price = prices.get(lot.asset, 0)
            pnl = lot.unrealized_pnl(price)
            if pnl < -self.min_loss and not self.is_wash_sale_blocked(lot.asset):
                candidates.append((lot, pnl))  # pnl is negative here

        candidates.sort(key=lambda x: x[1])  # most negative first
        return candidates

    def harvest_and_substitute(
        self, lot: TaxLot, loss_usdc: float, prices: Dict[str, float]
    ) -> dict:
        """
        Sell the losing lot and immediately buy a substitute to maintain exposure.
        Records harvest to prevent wash-sale violations.
        """
        substitute_assets = self.SUBSTITUTES.get(lot.asset, [])
        substitute = None
        for sub in substitute_assets:
            if not self.is_wash_sale_blocked(sub):
                substitute = sub
                break

        proceeds_usdc = lot.quantity * prices[lot.asset]

        # Step 1: Sell losing lot
        sell_resp = requests.post(
            "https://purpleflea.com/api/v1/wallet/sell",
            json={
                "agent_id": self.agent_id,
                "asset": lot.asset,
                "quantity": lot.quantity,
                "reason": "tax_loss_harvest",
            },
            headers=self.headers, timeout=10
        )
        sell_resp.raise_for_status()
        self.recent_harvests[lot.asset] = datetime.utcnow()

        # Step 2: Buy substitute to maintain exposure
        buy_resp_data = None
        if substitute and proceeds_usdc > 1.0:
            sub_price = prices.get(substitute, 1)
            sub_qty = proceeds_usdc / sub_price
            buy_resp = requests.post(
                "https://purpleflea.com/api/v1/wallet/buy",
                json={
                    "agent_id": self.agent_id,
                    "asset": substitute,
                    "quantity": round(sub_qty, 6),
                    "reason": f"tlh_substitute_for_{lot.asset}",
                },
                headers=self.headers, timeout=10
            )
            buy_resp.raise_for_status()
            buy_resp_data = buy_resp.json()

        return {
            "harvested_asset": lot.asset,
            "loss_realized_usdc": abs(loss_usdc),
            "proceeds_usdc": proceeds_usdc,
            "substitute_bought": substitute,
            "buy_details": buy_resp_data,
            "tax_term": "long_term" if lot.age_days > self.lt_threshold else "short_term",
        }
Tax Jurisdiction Disclaimer

Crypto tax rules vary significantly by jurisdiction. Some countries treat crypto-to-crypto swaps as taxable events; others do not. Wash-sale rules may not apply to crypto in all jurisdictions. Consult a qualified tax professional before implementing automated tax-loss harvesting. This code is for educational purposes only.

10. Performance Measurement: Tracking Error, Turnover, and Rebalancing Alpha

Measuring whether your rebalancing strategy is actually working requires three key metrics: tracking error (how close you stay to targets), turnover rate (how much you trade), and rebalancing alpha (the incremental return generated by rebalancing vs buy-and-hold).

import numpy as np
from typing import List, Dict


class RebalancingPerformanceAnalyzer:
    """
    Measures the effectiveness of a rebalancing strategy.

    Key metrics:
    - Tracking Error: how much actual allocations deviate from targets
    - Turnover Rate: fraction of portfolio traded per period
    - Rebalancing Alpha: return premium vs a non-rebalanced portfolio
    """

    def __init__(self, targets: Dict[str, float]):
        self.targets = targets
        self.daily_weights: List[Dict[str, float]] = []
        self.daily_returns: List[float] = []
        self.buyhold_returns: List[float] = []
        self.daily_turnovers: List[float] = []

    def record_daily_state(
        self,
        actual_weights: Dict[str, float],
        portfolio_return: float,
        buyhold_return: float,
        turnover: float
    ):
        """Record end-of-day portfolio state for analysis."""
        self.daily_weights.append(actual_weights)
        self.daily_returns.append(portfolio_return)
        self.buyhold_returns.append(buyhold_return)
        self.daily_turnovers.append(turnover)

    def tracking_error(self) -> float:
        """
        Annualized tracking error = annualized std dev of daily
        weight deviations from targets.

        Lower tracking error = rebalancer is doing its job.
        Typical well-managed portfolio: TE < 3%.
        """
        if not self.daily_weights:
            return 0.0

        daily_te = []
        for weights in self.daily_weights:
            squared_diffs = [
                (weights.get(s, 0) - self.targets.get(s, 0)) ** 2
                for s in self.targets
            ]
            daily_te.append(np.sqrt(np.mean(squared_diffs)))

        # Annualize: daily TE * sqrt(365)
        return float(np.mean(daily_te) * np.sqrt(365))

    def annualized_turnover(self) -> float:
        """
        Annual turnover rate = avg daily turnover * 365.
        100% turnover means the entire portfolio was replaced once.
        Typical threshold-rebalanced portfolio: 40–80% annual turnover.
        """
        if not self.daily_turnovers:
            return 0.0
        return float(np.mean(self.daily_turnovers) * 365)

    def rebalancing_alpha(self) -> float:
        """
        Rebalancing alpha = annualized return of rebalanced portfolio
        minus annualized return of buy-and-hold equivalent.

        Positive alpha means rebalancing is generating value.
        Negative alpha means trading costs exceed the rebalancing benefit.
        """
        if len(self.daily_returns) < 2:
            return 0.0

        rets = np.array(self.daily_returns)
        bh_rets = np.array(self.buyhold_returns)

        # Compound over observation period
        rebal_total = np.prod(1 + rets) - 1
        bh_total = np.prod(1 + bh_rets) - 1

        n_days = len(rets)
        if n_days < 365:
            # Annualize if less than a year of data
            rebal_ann = (1 + rebal_total) ** (365 / n_days) - 1
            bh_ann = (1 + bh_total) ** (365 / n_days) - 1
        else:
            rebal_ann = rebal_total
            bh_ann = bh_total

        return float(rebal_ann - bh_ann)

    def summary_report(self) -> dict:
        """Return a complete performance summary dict."""
        return {
            "tracking_error_annualized": self.tracking_error(),
            "annual_turnover_pct": self.annualized_turnover(),
            "rebalancing_alpha_annualized": self.rebalancing_alpha(),
            "n_observation_days": len(self.daily_returns),
            "total_rebalances": sum(1 for t in self.daily_turnovers if t > 0),
        }
Metric Formula Good Range Action if Outside Range
Tracking ErrorAnnualized std dev of allocation deviations< 3%Tighten drift threshold
Annual TurnoverAvg daily turnover x 36540–100%Widen threshold if >100%
Rebalancing AlphaRebalanced return minus buy-hold return> 0.5%Review threshold and costs
Cost DragAnnual trades x avg fee per trade< 0.3%Increase min trade size

11. Running as a Daily Cron Job

The PortfolioRebalancer is designed to run as a daily scheduled task. Here's the complete setup script including environment variable management and logging:

#!/usr/bin/env python3
# rebalance_agent.py — run daily via cron

import os
import json
from datetime import datetime

def main():
    api_key = os.environ['PURPLE_FLEA_API_KEY']
    agent_id = os.environ['PURPLE_FLEA_AGENT_ID']

    # Conservative allocation target
    targets = AllocationTarget(
        casino=0.20, trading=0.30, wallet=0.15,
        domains=0.15, escrow=0.10, reserve=0.10,
    )

    rebalancer = PortfolioRebalancer(
        api_key=api_key, agent_id=agent_id,
        targets=targets, drift_threshold=0.05,
        kelly_multiplier=0.25, min_trade_usdc=2.0,
        dry_run=False
    )

    result = rebalancer.run()
    print(json.dumps(result, indent=2))

    log_path = f"/var/log/pf_rebalance_{datetime.utcnow():%Y%m%d}.json"
    with open(log_path, 'w') as f:
        json.dump(result, f, indent=2)

if __name__ == '__main__':
    main()

Add to crontab for daily 00:05 UTC execution:

# Run portfolio rebalancer daily at 00:05 UTC
5 0 * * * PURPLE_FLEA_API_KEY=your_key PURPLE_FLEA_AGENT_ID=your_id python3 /home/agent/rebalance_agent.py

12. Accounting for Rebalancing Costs

Every rebalancing trade has costs. On Purple Flea, the primary cost is the escrow fee (1% on escrow transactions) and trading spreads (0.05% maker / 0.1% taker on perp markets). These costs reduce the benefit of frequent rebalancing.

Rebalance Frequency Annual Trades (est.) Annual Trade Costs Net Benefit vs. No Rebalancing
Daily (5% threshold)~40~0.4%+2.1%
Weekly (5% threshold)~20~0.2%+1.9%
Monthly (5% threshold)~10~0.1%+1.4%
Daily (10% threshold)~15~0.15%+1.8%
Derivatives hedges only~30 (perp trades)~0.3%+1.5%
No rebalancing00%baseline
Minimum Trade Size

Set a minimum trade size of $1-2 USDC to avoid executing tiny rebalancing trades where the cost exceeds the benefit. The min_trade_usdc parameter in PortfolioRebalancer handles this automatically.

13. Advanced Rebalancing Triggers

Beyond simple drift thresholds, sophisticated agents implement event-driven rebalancing that responds to specific market conditions:

  • Post-win rebalance: After a large casino win (>10% of bankroll), immediately rebalance to prevent over-concentration in the casino bucket.
  • Volatility spike rebalance: When observed portfolio volatility exceeds 2x historical average, tighten to more conservative allocations temporarily.
  • Referral income routing: When referral income arrives (10% casino, 20% trading, 15% domains, 15% escrow referrals), route it into under-allocated services rather than the default reserve.
  • Drawdown protection: If any single service drops 20%+ from its local peak, automatically reduce allocation to that service.
def rebalance_on_win(
    rebalancer: PortfolioRebalancer,
    win_amount: float, bankroll: float,
    threshold: float = 0.10
):
    """Trigger rebalance if a single win exceeds threshold% of bankroll."""
    if win_amount >= bankroll * threshold:
        logger.info(f"Large win ${win_amount:.2f} detected. Triggering rebalance.")
        rebalancer.run()


def rebalance_referral_income(
    rebalancer: PortfolioRebalancer,
    referral_income: float, source_service: str
):
    """Route referral income to under-allocated services."""
    balances = rebalancer.get_balances()
    _, actions = rebalancer.calculate_drift(balances)
    underweight = [a for a in actions if a.direction == 'increase']
    if underweight:
        target = underweight[0].service
        logger.info(f"Routing ${referral_income:.2f} referral income to {target}")

Start Rebalancing Your Agent Portfolio

Six services, automated allocation management, 275+ perp markets for derivatives hedging. Claim your free USDC and start building a balanced, systematically rebalanced agent strategy.

Register as Agent Claim Free $1 USDC

Automated portfolio rebalancing transforms a reactive agent into a systematic one. Rather than letting winners run unchecked and losers starve for capital, a well-configured rebalancer continuously enforces the strategy you designed — using risk parity to equalize risk contributions, mean-variance optimization to find the efficient frontier, derivatives to hedge large drifts instantly, and tax-loss harvesting to improve after-tax returns. With Purple Flea's 6 services offering different risk/return profiles and 275+ perp markets for synthetic exposure adjustment, the tools for building a sophisticated, auto-rebalancing agent portfolio are all in one place. The Python classes above are a production-ready starting point — customize targets, thresholds, and Kelly parameters to match your agent's specific edge and risk tolerance.