Signal Generation Pipelines for AI Trading Agents

Raw market data is noise. Signals are noise transformed into actionable predictions. This guide walks through building a production-grade signal generation pipeline — from data ingestion and feature engineering to ensemble combination and live order placement via the Purple Flea Trading API.

A signal is a quantitative prediction about future price movement with an associated strength, direction, and decay time. Good signals are forward-looking, statistically significant in out-of-sample testing, and combined intelligently with other signals. Bad signals are backtested into existence and forgotten as soon as they hit live markets. This guide teaches the difference — and shows how to build a pipeline that survives contact with reality.

5
primary signal categories covered
0.55+
minimum IC for a deployable signal
4
ensemble combination methods compared
<50ms
target latency for live signal scoring

1. Signal Types: Technical, Fundamental, Sentiment, On-Chain, Cross-Asset

Signals fall into five broad categories, each with different data sources, latencies, and decay characteristics. Robust trading agents combine signals from multiple categories to reduce dependence on any single source of alpha.

Technical Signals

Derived from price, volume, and order flow data. Examples include momentum indicators (RSI, MACD), moving average crossovers, Bollinger Band breakouts, order book imbalance, and VWAP deviation. Technical signals are high-frequency — they can be computed in milliseconds and decay within minutes to hours. They are the most common signal type for crypto trading agents because the data is readily available and standardized.

Fundamental Signals

Based on underlying asset value metrics: protocol revenue, active addresses, developer activity (GitHub commits), token emission schedules, and treasury balances. Fundamental signals change slowly and have decay times of days to weeks. They work best as filters or regime identifiers rather than direct trade triggers.

Sentiment Signals

Aggregated from social media, news sources, and AI-powered NLP models. Crypto-specific sources include Twitter/X post volume and tone, Reddit mentions, Fear and Greed index, and funding rate sentiment. Sentiment signals are noisy but can capture crowd psychology ahead of price moves. Decay is typically 1 to 12 hours for event-driven sentiment.

On-Chain Signals

Derived from blockchain transaction data: whale wallet flows, exchange inflows/outflows, large transfer clustering, miner activity, and DeFi protocol utilization. On-chain data is transparent and manipulation-resistant. Signal decay is 4 to 48 hours depending on the metric — whale accumulation patterns last longer than single large transactions.

Cross-Asset Signals

Exploit correlations between related markets: BTC dominance shifts, BTC/ETH ratio, crypto vs equity beta, stablecoin dominance trends, and DeFi TVL changes. Cross-asset signals provide diversification benefit because they are often uncorrelated with pure technical signals on the same asset.

Signal Type Data Latency Typical Decay Best Use Difficulty
Technical Milliseconds Minutes – Hours Trade triggering Low
Fundamental Hours – Days Days – Weeks Regime filter Medium
Sentiment Seconds – Minutes 1 – 12 Hours Short-term momentum Medium
On-Chain Minutes – Hours 4 – 48 Hours Directional bias High
Cross-Asset Milliseconds Hours – Days Portfolio regime Medium

2. Signal Pipeline Architecture

A signal pipeline transforms raw market data into an actionable trade signal through a structured sequence of stages. Each stage has a single responsibility, making the pipeline testable, replaceable, and observable.

STAGE 1
Raw Data
STAGE 2
Feature Eng.
STAGE 3
Signal Gen.
STAGE 4
Scoring
STAGE 5
Filtering
STAGE 6
Combination
STAGE 7
Trade Signal

Stage Descriptions

Separation of Concerns: Keep signal generation strictly separate from order execution. The pipeline should produce a signal object. A separate execution engine decides whether and how to act on it. This decoupling makes backtesting, paper trading, and live trading share the same signal pipeline with different execution backends.

3. Signal Strength Scoring and Filtering Weak Signals

Not all signals are equal. Signal scoring assigns a quantitative quality measure to each generated signal, allowing the pipeline to discard low-quality inputs before they pollute the ensemble.

Information Coefficient (IC)

The Information Coefficient is the Spearman rank correlation between predicted signal direction and actual subsequent returns. An IC of 0.0 indicates no predictive power; 0.05–0.10 is considered good in equities; 0.10+ is excellent for crypto agents given shorter holding periods and greater noise.

import numpy as np
from scipy import stats

def information_coefficient(signals: np.ndarray,
                              returns: np.ndarray) -> float:
    """
    Compute Spearman rank IC between signals and forward returns.

    Args:
        signals: Array of signal values at time t
        returns: Array of realized returns at time t+h (same length)
    Returns:
        IC value in [-1, +1]. Values > 0.05 indicate useful predictive power.
    """
    if len(signals) < 20:
        return 0.0
    ic, _ = stats.spearmanr(signals, returns)
    return float(ic)


def rolling_ic(signal_series: pd.Series,
               return_series: pd.Series,
               window: int = 60) -> pd.Series:
    """Rolling IC for temporal stability assessment."""
    ics = []
    for i in range(window, len(signal_series)):
        s = signal_series.iloc[i-window:i].values
        r = return_series.iloc[i-window:i].values
        ics.append(information_coefficient(s, r))
    return pd.Series(ics, index=signal_series.index[window:])

Signal Quality Thresholds

IC Range Quality Grade Action
> 0.10 A — Excellent Full weight in ensemble
0.05 – 0.10 B — Good Standard weight in ensemble
0.02 – 0.05 C — Marginal Half weight; monitor closely
0 – 0.02 D — Weak Exclude from ensemble
< 0 F — Inverted Investigate or discard

4. Python: SignalPipeline Class

The SignalPipeline class encapsulates the full seven-stage pipeline from raw feature input to composite trade signal. It supports dynamic signal registration, configurable quality filters, and multiple ensemble combination methods.

import numpy as np
import pandas as pd
from dataclasses import dataclass, field
from typing import Callable, Dict, List, Optional, Tuple
from scipy import stats
from enum import Enum

class SignalDirection(Enum):
    LONG  = 1
    FLAT  = 0
    SHORT = -1

@dataclass
class RawSignal:
    """Output of a single signal generator."""
    name: str
    value: float         # raw signal value, any scale
    strength: float      # normalised to [-1, +1]
    confidence: float    # IC or model confidence [0, 1]
    generated_at: pd.Timestamp
    ttl_seconds: int = 3600  # time-to-live (freshness window)

    @property
    def is_fresh(self) -> bool:
        age = (pd.Timestamp.now() - self.generated_at).total_seconds()
        return age < self.ttl_seconds

@dataclass
class CompositeSignal:
    """Final pipeline output: composite trade signal."""
    direction: SignalDirection
    strength: float          # composite strength [-1, +1]
    contributing_signals: List[str]
    generated_at: pd.Timestamp
    raw_signals: List[RawSignal] = field(default_factory=list)


class SignalPipeline:
    """
    Modular signal generation pipeline for AI trading agents.

    Register signal generators, set quality filters, and call
    generate() to produce a composite trade signal from current
    market features.

    Example:
        pipeline = SignalPipeline(min_ic=0.04, min_signals=2)
        pipeline.register(rsi_signal, ttl=900, weight=1.0)
        pipeline.register(funding_signal, ttl=3600, weight=1.5)
        composite = pipeline.generate(features)
    """

    def __init__(self,
                 min_ic: float = 0.04,
                 min_signals: int = 2,
                 combination_method: str = "ic_weighted"):
        """
        Args:
            min_ic: Minimum IC for a signal to pass quality filter
            min_signals: Minimum passing signals to produce output
            combination_method: One of 'equal', 'ic_weighted',
                                'confidence_weighted', 'rank_combination'
        """
        self.min_ic = min_ic
        self.min_signals = min_signals
        self.combination_method = combination_method
        # Registry: {name: (generator_fn, ttl, base_weight, ic_history)}
        self._registry: Dict[str, Dict] = {}

    def register(self,
                 generator: Callable[..., RawSignal],
                 name: str,
                 ttl_seconds: int = 3600,
                 weight: float = 1.0):
        """Register a signal generator function."""
        self._registry[name] = {
            "fn": generator,
            "ttl": ttl_seconds,
            "weight": weight,
            "ic_history": [],
        }

    def generate(self, features: Dict[str, float]) -> Optional[CompositeSignal]:
        """
        Run all registered generators on current features.
        Returns composite signal or None if insufficient passing signals.
        """
        raw_signals: List[RawSignal] = []
        for name, config in self._registry.items():
            try:
                sig = config["fn"](features)
                sig.name = name
                sig.ttl_seconds = config["ttl"]
                raw_signals.append(sig)
            except Exception as e:
                print(f"[WARN] Signal {name} failed: {e}")

        passing = self.filter(raw_signals)

        if len(passing) < self.min_signals:
            return None   # insufficient signal quality — stay flat

        composite_strength = self.combine(passing)
        direction = self._to_direction(composite_strength)

        return CompositeSignal(
            direction=direction,
            strength=composite_strength,
            contributing_signals=[s.name for s in passing],
            generated_at=pd.Timestamp.now(),
            raw_signals=passing,
        )

    def score(self, signal: RawSignal) -> float:
        """
        Score a single signal for quality.
        Returns IC estimate using recent performance history if available,
        otherwise returns signal's own confidence value.
        """
        history = self._registry.get(signal.name, {}).get("ic_history", [])
        if len(history) >= 30:
            return float(np.mean(history[-30:]))
        return signal.confidence

    def filter(self, signals: List[RawSignal]) -> List[RawSignal]:
        """
        Filter signals by quality (IC) and freshness.
        Returns only signals that pass both criteria.
        """
        passing = []
        for sig in signals:
            if not sig.is_fresh:
                continue
            ic_est = self.score(sig)
            if ic_est >= self.min_ic:
                passing.append(sig)
        return passing

    def combine(self, signals: List[RawSignal]) -> float:
        """
        Combine filtered signals into a single composite strength value
        using the configured combination method.
        """
        if not signals:
            return 0.0

        if self.combination_method == "equal":
            return float(np.mean([s.strength for s in signals]))

        elif self.combination_method == "ic_weighted":
            weights = np.array([max(0, self.score(s)) for s in signals])
            values  = np.array([s.strength for s in signals])
            total_w = weights.sum()
            if total_w == 0:
                return float(np.mean(values))
            return float(np.dot(weights, values) / total_w)

        elif self.combination_method == "confidence_weighted":
            weights = np.array([s.confidence for s in signals])
            values  = np.array([s.strength for s in signals])
            total_w = weights.sum()
            return float(np.dot(weights, values) / total_w) if total_w > 0 else 0.0

        elif self.combination_method == "rank_combination":
            # Rank-based IC combination (more robust to outliers)
            ranks = stats.rankdata([s.strength for s in signals])
            weights = np.array([max(0, self.score(s)) for s in signals])
            # Normalise ranks to [-1, +1]
            n = len(ranks)
            norm_ranks = (ranks - (n + 1) / 2) / (n / 2)
            total_w = weights.sum()
            return float(np.dot(weights, norm_ranks) / total_w) if total_w > 0 else 0.0

        return 0.0

    def _to_direction(self, strength: float) -> SignalDirection:
        """Convert composite strength to discrete direction."""
        if strength > 0.15:
            return SignalDirection.LONG
        elif strength < -0.15:
            return SignalDirection.SHORT
        return SignalDirection.FLAT

    def update_ic(self, signal_name: str, ic: float):
        """Feed back realised IC after trade closes for adaptive weighting."""
        if signal_name in self._registry:
            self._registry[signal_name]["ic_history"].append(ic)
            # Keep only last 90 observations
            self._registry[signal_name]["ic_history"] = \
                self._registry[signal_name]["ic_history"][-90:]

5. Combining Signals with Ensemble Methods

The four combination methods in SignalPipeline.combine() each have different properties. Understanding their tradeoffs helps you choose the right method for your strategy.

Equal Weighting

The simplest approach: average the strength values of all passing signals. Equal weighting is robust when you have no reliable estimate of each signal's quality, or when signal qualities are similar. It is the baseline all other methods should outperform.

IC-Weighted Combination

Weights each signal's contribution by its historical Information Coefficient. Signals with higher predictive accuracy get more influence on the composite. This is the preferred default for most strategies — it is mathematically optimal under the assumption that ICs are stable and signals are independent.

Confidence-Weighted Combination

Uses the signal generator's own confidence estimate as the weight. Useful when signals come from probabilistic models (e.g., ML classifiers) that output calibrated probability scores. Works best when the confidence estimates are well-calibrated — a common failure mode is overconfident models that produce high confidence even for low-IC predictions.

Rank-Based Combination

Converts signal strengths to ranks before weighting and combining. More robust to outliers and heavy-tailed signal distributions. Recommended when signals have very different scales or when a single strong signal could otherwise dominate the ensemble inappropriately.

Ensemble Method Selection Guide

Use IC-weighted when you have 60+ days of signal history and stable ICs.

Use confidence-weighted when signals come from calibrated ML models.

Use rank combination when signal scales vary widely or outliers are common.

Use equal weighting as the baseline when none of the above conditions are met.

6. Signal Decay and Freshness Management

Every signal has a freshness window — the period during which it retains predictive power. Using a stale signal is equivalent to trading on no signal at all, but without the protection of staying flat.

TTL by Signal Type

Signal Type Suggested TTL Notes
Order book imbalance 10 – 60 seconds Decays almost instantly in liquid markets
RSI / MACD 5 – 30 minutes Depends on bar interval; 1m bar → 5m TTL
Funding rate signal 1 – 8 hours Rates update every 1-8h on most exchanges
Sentiment (social) 30 min – 4 hours News-driven sentiment spikes decay fastest
On-chain whale flow 4 – 24 hours Large moves take time to reflect in price
Fundamental (protocol revenue) 24 – 168 hours Weekly revenue data; use as long-horizon filter
class SignalCache:
    """
    Thread-safe signal cache with TTL-based invalidation.
    Avoids redundant signal computation within the freshness window.
    """

    def __init__(self):
        self._cache: Dict[str, RawSignal] = {}

    def get(self, name: str) -> Optional[RawSignal]:
        sig = self._cache.get(name)
        if sig and sig.is_fresh:
            return sig
        return None

    def set(self, name: str, signal: RawSignal):
        self._cache[name] = signal

    def invalidate(self, name: str):
        self._cache.pop(name, None)

    def prune_stale(self):
        """Remove all expired signals from cache."""
        stale = [k for k, v in self._cache.items() if not v.is_fresh]
        for k in stale:
            del self._cache[k]


# Usage: wrap pipeline.generate() with cache
cache = SignalCache()

def get_or_generate(pipeline: SignalPipeline,
                     features: Dict,
                     signal_name: str) -> Optional[RawSignal]:
    cached = cache.get(signal_name)
    if cached:
        return cached

    composite = pipeline.generate(features)
    if composite:
        for sig in composite.raw_signals:
            cache.set(sig.name, sig)
    return composite

7. Backtesting Signals Independently Before Live Deployment

Before connecting any signal to a live trading strategy, backtest it in isolation. Individual signal backtests reveal the true IC, optimal holding period, and decay characteristics — before any execution noise or ensemble effects obscure the signal's standalone quality.

The Signal Backtest Protocol

  1. 1
    Historical feature generation. Reconstruct the exact feature set the signal would have received at each historical point-in-time. Avoid look-ahead bias by using only data available at that moment.
  2. 2
    Signal value computation. Apply the signal generator to historical features. Store the raw signal value at each bar.
  3. 3
    Forward return pairing. For each signal value at time t, record the actual return at t+h for a range of holding periods h (e.g., 15m, 1h, 4h, 24h).
  4. 4
    IC computation. Compute Spearman IC between signal values and forward returns for each holding period. The holding period with highest IC is the signal's natural horizon.
  5. 5
    Quintile analysis. Bucket signals into quintiles by strength. Check that the top quintile earns the highest forward returns and the bottom quintile earns the lowest. Monotonic quintile returns confirm the signal is well-specified.
  6. 6
    Out-of-sample validation. Reserve the most recent 30% of data as a hold-out set. A signal that only works in-sample is overfitted and will fail live.
def signal_backtest_report(
        signal_values: pd.Series,
        ohlcv: pd.DataFrame,
        holding_periods: List[int] = [15, 60, 240, 1440]
) -> Dict[str, float]:
    """
    Generate standalone IC report for a signal across multiple
    holding periods (in minutes).

    Args:
        signal_values: Timestamped signal values, index = pd.DatetimeIndex
        ohlcv: OHLCV DataFrame (must cover signal_values period + max horizon)
        holding_periods: List of forward return horizons in minutes
    Returns:
        Dict mapping horizon label to IC value
    """
    results = {}

    for h in holding_periods:
        # Forward returns: close[t+h] / close[t] - 1
        fwd_returns = ohlcv["close"].pct_change(h).shift(-h)

        # Align on common index
        aligned = pd.concat([signal_values, fwd_returns],
                            axis=1, join="inner").dropna()
        aligned.columns = ["signal", "fwd_ret"]

        if len(aligned) < 30:
            results[f"{h}m"] = None
            continue

        ic, pval = stats.spearmanr(aligned["signal"], aligned["fwd_ret"])
        results[f"{h}m"] = {
            "ic": round(ic, 4),
            "p_value": round(pval, 4),
            "n_obs": len(aligned),
            "significant": pval < 0.05,
        }

    return results


# Quintile analysis
def quintile_returns(signal_values: pd.Series,
                     forward_returns: pd.Series) -> pd.DataFrame:
    """Check monotonicity: Q5 should have highest returns."""
    df = pd.concat([signal_values, forward_returns],
                   axis=1, join="inner").dropna()
    df.columns = ["signal", "fwd_ret"]
    df["quintile"] = pd.qcut(df["signal"], q=5, labels=[1,2,3,4,5])
    return df.groupby("quintile")["fwd_ret"].agg(["mean", "std", "count"])
Look-Ahead Bias: The most common backtesting error is using data that would not have been available at the moment the signal was generated. On-chain data, sentiment scores, and fundamental metrics often have publication delays of hours to days. Always model data availability realistically.

8. Integration with Purple Flea Trading API for Signal-Triggered Orders

Once signals pass quality checks and backtesting validation, connect the pipeline to the Purple Flea Trading API. The integration translates a CompositeSignal into a live order with size determined by signal strength.

Signal-to-Order Translation

import asyncio
import httpx

PF_BASE = "https://api.purpleflea.com"
API_KEY = "pf_live_YOUR_KEY"   # replace with your key

def signal_to_size(signal: CompositeSignal,
                   base_size: float,
                   max_leverage: float = 3.0) -> float:
    """
    Convert composite signal strength to position size.
    Scales linearly from 0.5x at threshold to max_leverage at full strength.
    """
    abs_strength = abs(signal.strength)
    min_threshold = 0.15
    # Linear interpolation between threshold (0.5x) and 1.0 (max_leverage)
    scale = 0.5 + (0.5 * (abs_strength - min_threshold) / (1.0 - min_threshold))
    return round(base_size * scale * max_leverage, 4)


async def execute_signal(client: httpx.AsyncClient,
                          signal: CompositeSignal,
                          market: str,
                          base_size: float):
    """
    Place an order on Purple Flea Trading based on composite signal output.
    Returns immediately if signal direction is FLAT.
    """
    if signal.direction == SignalDirection.FLAT:
        print(f"[SIGNAL] FLAT on {market} — no order placed")
        return

    side = "buy"  if signal.direction == SignalDirection.LONG else "sell"
    size = signal_to_size(signal, base_size)

    payload = {
        "market": market,
        "side": side,
        "size": size,
        "type": "market",
        "metadata": {
            "signal_strength": signal.strength,
            "contributing_signals": signal.contributing_signals,
            "generated_at": signal.generated_at.isoformat(),
        },
    }

    resp = await client.post(
        f"{PF_BASE}/v1/orders",
        json=payload,
        headers={"Authorization": f"Bearer {API_KEY}"},
    )
    resp.raise_for_status()
    order = resp.json()
    print(f"[ORDER] {side.upper()} {size} {market} | ID: {order['id']}")
    return order


async def signal_trading_loop(market: str,
                               pipeline: SignalPipeline,
                               base_size: float = 100.0,
                               interval_seconds: int = 300):
    """
    Main loop: fetch features, generate signal, execute order.
    Runs every interval_seconds (default 5 minutes).
    """
    async with httpx.AsyncClient(timeout=15) as client:
        while True:
            # 1. Fetch current market data
            resp = await client.get(
                f"{PF_BASE}/v1/markets/{market}/features",
                headers={"Authorization": f"Bearer {API_KEY}"},
            )
            features = resp.json()

            # 2. Run signal pipeline
            composite = pipeline.generate(features)

            # 3. Execute if we have a signal
            if composite:
                await execute_signal(client, composite, market, base_size)
            else:
                print(f"[PIPELINE] Insufficient signals — holding flat")

            await asyncio.sleep(interval_seconds)


# Example signal generators
def rsi_signal(features: Dict) -> RawSignal:
    """Simple RSI-based signal: oversold = long, overbought = short."""
    rsi = features["rsi_14"]
    strength = -(rsi - 50) / 50   # RSI 30 → +0.4, RSI 70 → -0.4
    return RawSignal(
        name="rsi_14",
        value=rsi,
        strength=strength,
        confidence=0.06,
        generated_at=pd.Timestamp.now(),
        ttl_seconds=900,
    )


def funding_rate_signal(features: Dict) -> RawSignal:
    """High positive funding → crowded longs → fade long, go short."""
    rate = features["funding_rate_8h"]
    strength = -np.tanh(rate / 0.001)  # normalise around 0.1% rate
    return RawSignal(
        name="funding_rate",
        value=rate,
        strength=strength,
        confidence=0.08,
        generated_at=pd.Timestamp.now(),
        ttl_seconds=3600,
    )


# Bootstrap a pipeline
if __name__ == "__main__":
    pipe = SignalPipeline(min_ic=0.04, min_signals=2,
                         combination_method="ic_weighted")
    pipe.register(rsi_signal, name="rsi_14", ttl_seconds=900, weight=1.0)
    pipe.register(funding_rate_signal, name="funding_rate",
                  ttl_seconds=3600, weight=1.5)

    asyncio.run(signal_trading_loop(
        market="BTC-PERP",
        pipeline=pipe,
        base_size=200.0,
        interval_seconds=300,
    ))

API Endpoints for Signal-Driven Trading

Endpoint Method Purpose
/v1/markets/:market/features GET Fetch pre-computed market features for signal generation
/v1/markets/:market/ohlcv GET Historical OHLCV data for backtesting signals
/v1/orders POST Place market or limit order triggered by composite signal
/v1/positions GET Check current positions before generating new signal
/v1/trades GET Fetch closed trades to compute realised IC for signal feedback
New agents: Claim free starting capital from the Purple Flea Faucet to test your signal pipeline in live conditions before committing real funds. The faucet provides zero-risk capital for initial exploration.

Connect Your Signal Pipeline to Purple Flea Trading

Live market features, historical OHLCV data, and programmatic order placement — everything your signal pipeline needs to go from backtest to live in a single API.

Trading API Docs Claim Free Capital

Key Takeaways