Multi-Signal Alpha Research Platform with Parfun¶
A systematic equity fund’s alpha research pipeline evaluates dozens of predictive signals, estimates covariance structures, and constructs optimal portfolios.
\[\alpha_i = \sum_{k=1}^{K} w_k \cdot z_{i,k} \quad\text{where}\quad w_k \propto \frac{\text{ICIR}_k}{\sqrt{\text{corr}(IC_k, IC_{k'})}}\]
The pipeline has a parallelisable bottleneck in signal computation:
110 signal + IC tasks: 11 base alpha signals × 10 lookback/decay variants, each with 4-horizon IC analysis
Sequential tail: signal weight optimisation, composite alpha, 5 covariance estimations, 5 portfolio constructions, 5 walk-forward evaluations
Parfun parallelises the signal + IC tier by distributing the 110 independent signal specs across Scaler workers with a single @pf.parallel decorator.
Program Structure¶
Native Python Implementation¶
[ ]:
import os
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["GOTO_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
import scipy.linalg
import scipy.optimize
import scipy.stats
# ─── Domain types ─────────────────────────────────────────────────────────────
@dataclass
class UniverseData:
"""
Panel of returns, fundamentals, and sector classifications.
In production: loaded from a data warehouse (Compustat, CRSP, FactSet).
"""
returns: np.ndarray # (n_days, n_stocks) daily excess returns
prices: np.ndarray # (n_days, n_stocks)
market_caps: np.ndarray # (n_days, n_stocks)
book_to_market: np.ndarray # (n_days, n_stocks)
roe: np.ndarray # (n_days, n_stocks) return on equity
asset_growth: np.ndarray # (n_days, n_stocks)
accruals: np.ndarray # (n_days, n_stocks) operating accruals/assets
eps_revisions: np.ndarray # (n_days, n_stocks) FY1 EPS revision %
short_interest: np.ndarray # (n_days, n_stocks) SI / shares outstanding
earnings_surprise: (
np.ndarray
) # (n_days, n_stocks) standardised earnings surprise
sector_dummies: np.ndarray # (n_stocks, n_sectors) one-hot
tickers: List[str]
dates: List[str]
n_days: int
n_stocks: int
n_sectors: int
@dataclass
class SignalSpec:
name: str
description: str
lookback: int # rolling window for construction (trading days)
skip: int # skip period (e.g. 1 for momentum, 0 for quality)
decay_halflife: int # exponential decay half-life for weighting
@dataclass
class NeutralizedSignal:
name: str
values: np.ndarray # (n_days, n_stocks): cross-sectionally z-scored
forward_returns: (
np.ndarray
) # (n_days, n_stocks, n_horizons): [1d,5d,21d,63d]
horizons: List[int]
@dataclass
class ICStats:
signal_name: str
horizon: int
ic_series: np.ndarray # time series of daily rank ICs
mean_ic: float
icir: float # IC / std(IC) * sqrt(252)
t_stat: float
decay_halflife: float # estimated IC decay half-life (days)
quantile_spread: float # Q5 - Q1 annualised return
hit_rate: float # fraction of days with positive IC
@dataclass
class SignalWeights:
"""Bayesian-shrunk optimal signal combination weights."""
weights: Dict[str, float] # signal_name → weight
shrinkage_intensity: float
effective_n_signals: float # diversity measure
@dataclass
class CompositeAlpha:
values: np.ndarray # (n_days, n_stocks): composite alpha forecast
component_weights: Dict[str, float]
information_ratio_estimate: float
@dataclass
class CovarianceEstimate:
method: str
matrix: np.ndarray # (n_stocks, n_stocks)
condition_number: float
effective_rank: float
estimation_error_bound: (
float # Frobenius norm of estimated estimation error
)
@dataclass
class Portfolio:
covariance_method: str
weights_history: np.ndarray # (n_days, n_stocks): daily portfolio weights
turnover_series: np.ndarray # (n_days,)
ex_ante_ir: float
target_volatility: float
@dataclass
class PerformanceStats:
covariance_method: str
annualised_return: float
annualised_vol: float
sharpe_ratio: float
information_ratio: float
max_drawdown: float
avg_turnover: float
hit_rate: float
factor_exposures: Dict[str, float]
@dataclass
class AlphaResearchReport:
signal_rankings: pd.DataFrame
optimal_weights: SignalWeights
portfolio_comparison: pd.DataFrame
recommended_covariance: str
composite_ic: float
composite_icir: float
# ─── Pipeline nodes ───────────────────────────────────────────────────────────
def load_universe_data(
n_stocks: int = 1500,
n_days: int = 1260, # 10 years of daily data
n_sectors: int = 11, # GICS sectors
seed: int = 42,
) -> UniverseData:
"""
Load the equity universe panel.
In production: read from a Parquet data lake or kdb+ tick database.
WHY THIS IS A SEPARATE NODE:
- Real data loading: 3000 stocks × 10yr daily from a data warehouse
is 5–15 minutes over typical enterprise networks
- Loading data separately means all 11 signal computations
can fire immediately once data is loaded, with zero polling overhead
- If data loading fails, the pipeline aborts cleanly without wasted compute
The synthetic data here embeds a realistic factor structure:
- True market factor (beta)
- 5 style factors with known premia
- Sector clustering
- Time-varying volatility (GARCH-like)
"""
rng = np.random.default_rng(seed)
n_factor = 8 # market + 7 style factors
# ── Factor structure ────────────────────────────────────────────────────
# True factor loadings (persistent): (n_stocks, n_factors)
B_true = rng.standard_normal((n_stocks, n_factor)) * 0.4
B_true[:, 0] = 1.0 # market beta ≈ 1
# Factor returns: market has positive drift, style factors have premia
factor_premia = (
np.array([0.05, 0.03, 0.04, 0.02, -0.02, 0.01, 0.03, 0.00]) / 252
)
factor_vol = np.array(
[0.15, 0.04, 0.05, 0.03, 0.04, 0.03, 0.04, 0.03]
) / np.sqrt(252)
F_raw = rng.standard_normal((n_days, n_factor))
factor_returns = factor_premia + F_raw * factor_vol
# Time-varying volatility: GARCH(1,1) proxy
garch_vol = np.ones(n_days)
for t in range(1, n_days):
shock = factor_returns[t - 1, 0] ** 2
garch_vol[t] = np.sqrt(
0.94 * garch_vol[t - 1] ** 2 + 0.05 * shock + 0.01 * 0.15**2 / 252
)
# Idiosyncratic returns
idio_vol = rng.uniform(0.005, 0.025, n_stocks) / np.sqrt(252)
epsilon = rng.standard_normal((n_days, n_stocks)) * idio_vol
returns = factor_returns @ B_true.T + epsilon # (n_days, n_stocks)
# ── Price history ────────────────────────────────────────────────────────
prices = 100 * np.exp(np.cumsum(returns, axis=0))
# ── Fundamental data (quarterly, interpolated daily) ─────────────────────
# Market cap: positively related to size factor (B_true[:, 2])
log_mcap = 20 + 3 * B_true[:, 2] + rng.standard_normal(n_stocks) * 1.5
market_caps = np.outer(np.ones(n_days), np.exp(log_mcap))
market_caps *= 1 + rng.standard_normal((n_days, n_stocks)) * 0.002
# B/M: value factor exposure
book_to_market = np.clip(
0.5 + B_true[:, 1] * 0.3 + rng.standard_normal(n_stocks) * 0.2,
0.05,
5.0,
)
book_to_market = np.outer(np.ones(n_days), book_to_market)
book_to_market *= 1 + rng.standard_normal((n_days, n_stocks)) * 0.01
# ROE: quality factor
roe = np.clip(
0.12 + B_true[:, 3] * 0.05 + rng.standard_normal(n_stocks) * 0.06,
-0.30,
0.60,
)
roe = np.outer(np.ones(n_days), roe)
# Asset growth, accruals, EPS revisions, short interest: synthetic signals
asset_growth = rng.standard_normal((n_days, n_stocks)) * 0.05
accruals = rng.standard_normal((n_days, n_stocks)) * 0.03
eps_revisions = rng.standard_normal((n_days, n_stocks)) * 0.04
short_interest = np.abs(rng.standard_normal((n_days, n_stocks))) * 0.03
earnings_surprise = rng.standard_normal((n_days, n_stocks)) * 0.05
# Inject signal: some factors actually predict returns
# (momentum) prices predict 1M ahead returns
for lag in [21, 63]:
past_return = np.concatenate(
[np.zeros((lag, n_stocks)), returns[:-lag]]
)
returns += 0.01 * np.sign(past_return) # weak momentum signal
# Sector membership: 11 sectors
sector_ids = rng.integers(0, n_sectors, n_stocks)
sector_dummies = np.eye(n_sectors)[sector_ids] # (n_stocks, n_sectors)
tickers = [f"STK_{i:04d}" for i in range(n_stocks)]
dates = (
pd.bdate_range("2015-01-01", periods=n_days)
.strftime("%Y-%m-%d")
.tolist()
)
return UniverseData(
returns=returns,
prices=prices,
market_caps=market_caps,
book_to_market=book_to_market,
roe=roe,
asset_growth=asset_growth,
accruals=accruals,
eps_revisions=eps_revisions,
short_interest=short_interest,
earnings_surprise=earnings_surprise,
sector_dummies=sector_dummies,
tickers=tickers,
dates=dates,
n_days=n_days,
n_stocks=n_stocks,
n_sectors=n_sectors,
)
def compute_and_neutralize_signal(
universe: UniverseData, signal_spec: SignalSpec
) -> NeutralizedSignal:
"""
Construct one raw signal from the universe panel, then cross-sectionally
neutralize vs. sector, log-market-cap (size), and market beta.
WHY THIS IS HEAVY (the most common bottleneck in signal research):
For each of the 2520 days:
1. Compute rolling signal value per stock (e.g. 252-day momentum)
→ rolling pandas operation on 3000-column DataFrame
2. Winsorize at 3σ (cross-sectional)
3. Cross-sectional OLS regression on [sector dummies, log_mcap, beta_estimate]
→ 3000 obs × (11 + 1 + 1) = 13 regressors per day
→ lstsq solve: O(n_stocks × n_regressors²) = 507k flops × 2520 days
4. Take residuals (the neutralized signal)
5. Z-score across stocks
Total: ~3.8 billion flops, dominated by the daily OLS regressions.
Wall time: 3–8 minutes per signal on one core.
11 signals × 5 min = 55 min serial → 5 min on 11-worker cluster.
Also computes forward returns at 4 horizons for downstream IC nodes.
"""
rng = np.random.default_rng(hash(signal_spec.name) % 2**31)
n_days, n_stocks = universe.n_days, universe.n_stocks
R = universe.returns
horizons = [1, 5, 21, 63]
# ── Step 1: Compute raw signal ───────────────────────────────────────────
lb = signal_spec.lookback
sk = signal_spec.skip
# Strip variant suffix (e.g. "MOM_v3" → "MOM") for signal dispatch
_base = signal_spec.name.split("_v")[0]
if _base in ("MOM", "MOM6"):
# Price momentum: cumulative return over [skip+1, lookback] days
raw = np.full((n_days, n_stocks), np.nan)
for t in range(lb + sk, n_days):
raw[t, :] = np.sum(R[t - lb - sk : t - sk, :], axis=0)
elif _base == "STR":
# Short-term reversal: past 5-day return (negative signal)
raw = np.full((n_days, n_stocks), np.nan)
for t in range(5, n_days):
raw[t, :] = -np.sum(R[t - 5 : t, :], axis=0)
elif _base in ("QRE", "LOW"):
# Quality / low-vol: rolling realised volatility (negative)
raw = np.full((n_days, n_stocks), np.nan)
for t in range(lb, n_days):
raw[t, :] = -R[t - lb : t, :].std(
axis=0
) # negative vol = low-vol signal
elif _base == "VAL":
raw = universe.book_to_market.copy()
elif _base == "EPS":
raw = universe.eps_revisions.copy()
elif _base == "SHT":
raw = -universe.short_interest.copy() # negative: high SI is bearish
elif _base == "ERN":
raw = universe.earnings_surprise.copy()
elif _base == "ACC":
raw = -universe.accruals.copy() # negative accruals = higher quality
elif _base == "AGR":
raw = (
-universe.asset_growth.copy()
) # negative growth = conservative investment
elif _base == "CAR":
# Carry: simplified as dividend yield proxy (B/M minus growth)
raw = universe.book_to_market - universe.asset_growth
else:
raw = rng.standard_normal((n_days, n_stocks))
# ── Step 2: Daily cross-sectional neutralization ────────────────────────
# Neutralize vs. sector (11 dummies), log_mcap (1), beta (1)
# Uses pre-estimated betas from a 60-day rolling market regression
log_mcap = np.log(np.maximum(universe.market_caps, 1e6))
# Rolling market beta: 60-day OLS of stock vs. market return
market_ret = universe.returns.mean(
axis=1, keepdims=True
) # equal-weight market
betas = np.ones((n_days, n_stocks))
BETA_WIN = 60
for t in range(BETA_WIN, n_days):
y_block = universe.returns[t - BETA_WIN : t, :] # (60, n_stocks)
x_block = market_ret[t - BETA_WIN : t, :] # (60, 1)
# Vectorised OLS: beta = (X'X)^{-1} X'Y per stock
xx = float((x_block**2).sum())
if xx > 1e-12:
betas[t, :] = (x_block.T @ y_block)[0] / xx
# Daily OLS neutralization: expensive inner loop
neutralized = np.full_like(raw, np.nan)
for t in range(max(lb + sk, BETA_WIN), n_days):
s = raw[t, :]
if np.all(np.isnan(s)):
continue
# Fill NaN with 0 for regression
s_clean = np.where(np.isnan(s), 0.0, s)
# Winsorize at 3σ
mu_s, sd_s = np.nanmean(s), np.nanstd(s)
if sd_s < 1e-12:
continue
s_clean = np.clip(s_clean, mu_s - 3 * sd_s, mu_s + 3 * sd_s)
# Regressors: [sector dummies, log_mcap, beta]
X = np.hstack(
[
universe.sector_dummies, # (n_stocks, 11)
log_mcap[t : t + 1, :].T, # (n_stocks, 1)
betas[t : t + 1, :].T, # (n_stocks, 1)
]
) # (n_stocks, 13)
# OLS residual
try:
result = np.linalg.lstsq(X, s_clean, rcond=None)
resid = s_clean - X @ result[0]
except np.linalg.LinAlgError:
resid = s_clean
# Z-score the residual
r_std = resid.std()
if r_std > 1e-12:
neutralized[t, :] = resid / r_std
# ── Step 3: Compute forward returns at each horizon ─────────────────────
fwd_ret = np.full((n_days, n_stocks, len(horizons)), np.nan)
for hi, h in enumerate(horizons):
for t in range(n_days - h):
fwd_ret[t, :, hi] = universe.returns[t : t + h, :].sum(axis=0)
return NeutralizedSignal(
name=signal_spec.name,
values=neutralized,
forward_returns=fwd_ret,
horizons=horizons,
)
def compute_ic_and_decay(
signal: NeutralizedSignal,
horizon_idx: int,
min_coverage: int = 252, # minimum stocks with non-NaN signal
) -> ICStats:
"""
Compute the Information Coefficient (IC) time series and decay curve for
one (signal, horizon) pair.
IC_t = Spearman rank correlation(signal_{t}, return_{t,t+h})
ICIR = mean(IC) / std(IC) × sqrt(252/h)
IC decay: regress IC_{t+lag} on IC_t across lags 0..120 to estimate the
signal's useful forecast horizon.
WHY THIS IS HEAVY:
- Spearman rank correlation: O(n_stocks × log n_stocks) per day
(sorting 3000 stocks) × 2520 days = ~38M sort operations
- Bootstrap for IC confidence intervals: 500 resamples × 2520 days
- IC decay regression: OLS across 120 lag values, each with 2400+ obs
- Total: ~5–10 minutes per (signal, horizon) pair
- 11 signals × 4 horizons = 44 nodes × 7 min = 308 min serial
→ 7 min on 44-worker cluster
The IC decay half-life is the key deliverable: it tells the PM how
frequently to rebalance and what transaction cost budget is justified.
"""
h = signal.horizons[horizon_idx]
sig_vals = signal.values # (n_days, n_stocks)
fwd_ret = signal.forward_returns[:, :, horizon_idx] # (n_days, n_stocks)
n_days = sig_vals.shape[0]
ic_series = np.full(n_days, np.nan)
for t in range(n_days - h):
s = sig_vals[t, :]
r = fwd_ret[t, :]
valid = (~np.isnan(s)) & (~np.isnan(r))
if valid.sum() < min_coverage:
continue
# Spearman rank IC
s_rank = scipy.stats.rankdata(s[valid])
r_rank = scipy.stats.rankdata(r[valid])
n_v = valid.sum()
cov_sr = np.cov(s_rank, r_rank)[0, 1]
var_s = s_rank.var()
var_r = r_rank.var()
ic_series[t] = cov_sr / np.sqrt(var_s * var_r + 1e-12)
valid_ic = ic_series[~np.isnan(ic_series)]
if len(valid_ic) < 60:
return ICStats(
signal_name=signal.name,
horizon=h,
ic_series=ic_series,
mean_ic=0.0,
icir=0.0,
t_stat=0.0,
decay_halflife=0.0,
quantile_spread=0.0,
hit_rate=0.5,
)
mean_ic = float(np.mean(valid_ic))
std_ic = float(np.std(valid_ic))
icir = mean_ic / (std_ic + 1e-12) * np.sqrt(252 / h)
t_stat = mean_ic / (std_ic / np.sqrt(len(valid_ic)) + 1e-12)
hit_rate = float((valid_ic > 0).mean())
# ── IC decay: fit exponential to autocorrelation of IC series ────────────
max_lag = min(120, len(valid_ic) // 3)
lags = np.arange(1, max_lag + 1)
ic_autocorr = np.array(
[np.corrcoef(valid_ic[:-lag], valid_ic[lag:])[0, 1] for lag in lags]
)
# Fit: autocorr(lag) ≈ exp(-lag / τ) → log-linear OLS for τ
valid_ac = ic_autocorr > 0
if valid_ac.sum() > 5:
log_ac = np.log(ic_autocorr[valid_ac] + 1e-9)
x = lags[valid_ac].astype(float)
tau = -float(x @ x) / float(x @ log_ac + 1e-9)
decay_hl = tau * np.log(2)
else:
decay_hl = float(h)
# ── Factor-quantile return spread ────────────────────────────────────────
# Bin stocks into 5 quintiles by signal, measure Q5-Q1 spread
q_spreads = []
for t in range(n_days - h):
s = sig_vals[t, :]
r = fwd_ret[t, :]
valid = (~np.isnan(s)) & (~np.isnan(r))
if valid.sum() < min_coverage:
continue
s_v, r_v = s[valid], r[valid]
q_cut = np.quantile(s_v, [0.20, 0.40, 0.60, 0.80])
q_bins = np.digitize(s_v, q_cut) # 0..4
q_means = [
r_v[q_bins == q].mean() if (q_bins == q).sum() > 10 else np.nan
for q in range(5)
]
if not any(np.isnan(q_means)):
q_spreads.append(q_means[4] - q_means[0])
quant_spread = float(np.mean(q_spreads) * 252 / h) if q_spreads else 0.0
return ICStats(
signal_name=signal.name,
horizon=h,
ic_series=ic_series,
mean_ic=mean_ic,
icir=icir,
t_stat=t_stat,
decay_halflife=decay_hl,
quantile_spread=quant_spread,
hit_rate=hit_rate,
)
def optimize_signal_weights(
all_ic_stats: List[ICStats], shrinkage_target: str = "equal"
) -> SignalWeights:
"""
Bayesian shrinkage combination of signal weights.
Blocked until all signal + IC computations complete.
IC nodes complete. The wait is mandatory — you cannot compute optimal
combination weights until you know every signal's IC and ICIR.
Method: Ledoit-Wolf shrinkage on the IC covariance matrix, followed by
mean-variance optimization in IC space.
The effective number of independent signals (after accounting for
correlations) determines the optimal portfolio of predictors.
WHY THIS MATTERS:
- Signals are correlated (momentum variants share variance)
- Naively equal-weighting ignores redundancy → over-exposure to crowded factors
- ML fitting in IC space with leave-one-out cross-validation: expensive
"""
# Aggregate to per-signal stats (max horizon h=21 for combination)
signal_names = sorted(set(ic.signal_name for ic in all_ic_stats))
ic_by_signal: Dict[str, ICStats] = {}
for sig in signal_names:
# Pick the horizon with highest ICIR as representative
candidates = [
ic
for ic in all_ic_stats
if ic.signal_name == sig and not np.isnan(ic.icir)
]
if candidates:
ic_by_signal[sig] = max(candidates, key=lambda x: abs(x.icir))
if not ic_by_signal:
n = len(signal_names)
return SignalWeights(
weights={s: 1 / n for s in signal_names},
shrinkage_intensity=1.0,
effective_n_signals=float(n),
)
# Build IC time series matrix for computing pairwise IC correlations
# Use the mean IC per signal as the summary statistic
mean_ics = np.array([ic_by_signal[s].mean_ic for s in signal_names])
icir_vals = np.array([ic_by_signal[s].icir for s in signal_names])
t_stats = np.array([ic_by_signal[s].t_stat for s in signal_names])
n_sigs = len(signal_names)
# Pairwise IC correlation: estimated from IC time series of same-length signals
# Use IC series similarity as a proxy for signal redundancy
ic_series_list = []
min_len = None
for s in signal_names:
ic_s = ic_by_signal[s].ic_series
valid_mask = ~np.isnan(ic_s)
valid_ic = ic_s[valid_mask]
ic_series_list.append(valid_ic)
if min_len is None or len(valid_ic) < min_len:
min_len = len(valid_ic)
min_len = min_len or 252
# Align IC series to same length (use last min_len observations)
ic_matrix = np.array(
[s[-min_len:] for s in ic_series_list]
) # (n_sigs, min_len)
# Sample correlation matrix of IC series
if ic_matrix.shape[1] > 5:
sample_corr = np.corrcoef(ic_matrix)
else:
sample_corr = np.eye(n_sigs)
# Ledoit-Wolf shrinkage toward identity
mu_diag = np.trace(sample_corr) / n_sigs
n_T = ic_matrix.shape[1]
# OAS shrinkage intensity
rho_num = (1 - 2 / n_sigs) * np.trace(sample_corr @ sample_corr) + np.trace(
sample_corr
) ** 2
rho_den = (n_T + 1 - 2 / n_sigs) * (
np.trace(sample_corr @ sample_corr)
- np.trace(sample_corr) ** 2 / n_sigs
)
rho = float(np.clip(rho_num / (rho_den + 1e-10), 0, 1))
shrunk_corr = (1 - rho) * sample_corr + rho * np.eye(n_sigs)
# Mean-variance in IC space: maximize ICIR^2 / variance
# Signal → expected IC (the "return"), IC correlation matrix (the "covariance")
mu_ic = np.abs(icir_vals) # use |ICIR| as expected reward per unit IC vol
sign = np.sign(mean_ics) # preserve signal direction
# Constrained optimization: max mu' w s.t. w' Σ w ≤ 1, sum(w) = 1, w ≥ 0
def neg_ic_sharpe(w):
portfolio_ic = float(mu_ic @ w)
portfolio_var = float(w @ shrunk_corr @ w)
return -(portfolio_ic / np.sqrt(portfolio_var + 1e-10))
def jac_neg(w):
n = len(w)
grad = np.zeros(n)
portfolio_ic = float(mu_ic @ w)
portfolio_var = float(w @ shrunk_corr @ w)
denom = np.sqrt(portfolio_var + 1e-10)
grad = -(mu_ic * denom - portfolio_ic * (shrunk_corr @ w) / denom) / (
portfolio_var + 1e-10
)
return grad
n_sigs_local = n_sigs
bounds_w = [(0.0, 1.0)] * n_sigs_local
constraints = [{"type": "eq", "fun": lambda w: np.sum(w) - 1.0}]
w0 = np.ones(n_sigs_local) / n_sigs_local
# Multiple restarts
best_w, best_val = w0.copy(), np.inf
for _ in range(20):
rng_local = np.random.default_rng()
w_init = rng_local.dirichlet(np.ones(n_sigs_local))
try:
result = scipy.optimize.minimize(
neg_ic_sharpe,
w_init,
jac=jac_neg,
method="SLSQP",
bounds=bounds_w,
constraints=constraints,
options={"maxiter": 300, "ftol": 1e-10},
)
if result.fun < best_val:
best_val = result.fun
best_w = result.x
except Exception:
continue
# Apply sign (long/short signal direction) and normalise
w_signed = best_w * sign
w_signed = w_signed / (np.abs(w_signed).sum() + 1e-12)
# Effective number of signals: entropy-based
w_abs_norm = np.abs(best_w) / (np.abs(best_w).sum() + 1e-12)
eff_n = float(np.exp(-np.sum(w_abs_norm * np.log(w_abs_norm + 1e-12))))
return SignalWeights(
weights={s: float(w_signed[i]) for i, s in enumerate(signal_names)},
shrinkage_intensity=rho,
effective_n_signals=eff_n,
)
def compute_composite_alpha(
all_neutralized: List[NeutralizedSignal], weights: SignalWeights
) -> CompositeAlpha:
"""
Combine neutralized signal values using the optimised weights.
Applies an exponential decay to give more weight to recent observations.
Fast node: pure linear combination.
Waits for: all 11 neutralized signal nodes + optimize_signal_weights.
"""
signal_map = {s.name: s.values for s in all_neutralized}
n_days, n_stocks = list(signal_map.values())[0].shape
composite = np.zeros((n_days, n_stocks))
for name, w in weights.weights.items():
if name in signal_map and abs(w) > 1e-6:
vals = signal_map[name]
# Replace NaN with 0 (neutral) for combination
composite += w * np.where(np.isnan(vals), 0.0, vals)
# Information ratio estimate (upper bound):
# IR ≈ IC × sqrt(N_signals × breadth)
valid_ics = [v for v in weights.weights.values() if abs(v) > 1e-6]
ir_estimate = 0.03 * np.sqrt(len(valid_ics) * 252) # rough Grinold rule
return CompositeAlpha(
values=composite,
component_weights=weights.weights,
information_ratio_estimate=float(ir_estimate),
)
def estimate_covariance(
universe: UniverseData,
method: str,
lookback: int = 504,
n_factors: int = 15,
) -> CovarianceEstimate:
"""
Estimate the return covariance matrix using one of five methods.
All five run sequentially — each is genuinely expensive at n_stocks=3000.
WHY THIS IS HEAVY (the critical bottleneck in large-universe quant PMs):
"sample": O(p² × T) to compute + O(p³) to invert (Cholesky)
p=3000, T=504 → 4.5B + 27B flops ≈ 6 min
"ledoit_wolf": Same as sample + iterative shrinkage estimation
"nl_shrinkage": Ledoit-Wolf (2020) nonlinear: O(p³) eigendecomposition
+ numerical integration per eigenvalue → 8–15 min
"rmt_cleaned": Random Matrix Theory eigenvalue clipping:
O(p³) SVD + Marchenko-Pastur fitting → 8–12 min
"factor_model": PCA factor extraction + OLS specific risk + assembly
O(min(p,T)² × max(p,T)) SVD ≈ 4 min for T<p
The bottleneck is nl_shrinkage.
Serial would be 6+6+12+10+4 = 38 min; parallel = 12 min.
"""
rng = np.random.default_rng(hash(method) % 2**31)
R = universe.returns[-lookback:, :] # (lookback, n_stocks)
T, p = R.shape
R_dm = R - R.mean(axis=0)
if method == "sample":
S = R_dm.T @ R_dm / (T - 1) # (p, p) raw sample covariance
# Add small regularisation for invertibility
S += np.eye(p) * 1e-6
elif method == "ledoit_wolf":
S_raw = R_dm.T @ R_dm / (T - 1)
# OAS shrinkage intensity
mu = np.trace(S_raw) / p
delta = float(np.trace(S_raw @ S_raw))
gamma = float(np.trace(S_raw) ** 2)
rho = ((1 - 2 / p) * delta + gamma) / (
(T + 1 - 2 / p) * (delta - gamma / p)
)
rho = float(np.clip(rho, 0, 1))
S = (1 - rho) * S_raw + rho * mu * np.eye(p)
elif method == "nl_shrinkage":
# Ledoit-Wolf (2020) Oracle Approximating Shrinkage (OAS) for large p
# Exact formula using eigenvalue distribution
S_raw = R_dm.T @ R_dm / (T - 1)
eigvals, eigvecs = np.linalg.eigh(S_raw)
eigvals = np.maximum(eigvals, 1e-8)
# Stein's estimator: d_i^* = d_i / (1 + (p/T) × h(d_i))
# where h is a function of the empirical spectral distribution
c = p / T # concentration ratio
# Marchenko-Pastur Stieltjes transform approximation
d_star = np.zeros(p)
for i, d in enumerate(eigvals):
# Simplified NL shrinkage: Haff (1980) approximation
d_star[i] = d / (1 + c * (1 - d / (eigvals.mean() + 1e-10)))
d_star = np.maximum(d_star, eigvals * 0.1) # floor
S = (eigvecs * d_star) @ eigvecs.T
elif method == "rmt_cleaned":
# Random Matrix Theory: remove eigenvalues below Marchenko-Pastur upper edge
S_raw = R_dm.T @ R_dm / (T - 1)
sigma2 = np.trace(S_raw) / p
c = p / T
# Marchenko-Pastur upper/lower edges
lambda_plus = sigma2 * (1 + np.sqrt(c)) ** 2
lambda_minus = sigma2 * (1 - np.sqrt(c)) ** 2
eigvals, eigvecs = np.linalg.eigh(S_raw)
# Eigenvalues below lambda_plus are noise → shrink to mean noise level
noise_mask = eigvals < lambda_plus
noise_mean = (
float(eigvals[noise_mask].mean()) if noise_mask.any() else sigma2
)
d_clean = np.where(noise_mask, noise_mean, eigvals)
S = (eigvecs * d_clean) @ eigvecs.T
elif method == "factor_model":
# PCA factor model: S = B Σ_F B' + diag(σ²_ε)
# This is the fastest for large p when n_factors << p
# Full SVD for factor extraction (EXPENSIVE at p=3000)
U, sv, Vt = np.linalg.svd(R_dm, full_matrices=False)
F = U[:, :n_factors] * sv[:n_factors] # (T, n_factors)
B = Vt[:n_factors, :].T # (p, n_factors)
Sigma_F = np.cov(F.T) # (n_factors, n_factors)
residuals = R_dm - F @ B.T # (T, p)
sigma2_eps = residuals.var(axis=0) # (p,): specific variance
S = B @ Sigma_F @ B.T + np.diag(sigma2_eps)
else:
S = np.eye(p)
# Diagnostics
try:
eigvals_check = np.linalg.eigvalsh(S)
cond_num = float(eigvals_check[-1] / max(eigvals_check[0], 1e-12))
eff_rank = float(
np.exp(scipy.stats.entropy(eigvals_check / eigvals_check.sum()))
)
except Exception:
cond_num, eff_rank = 1e6, 1.0
# Estimation error bound (Frobenius): O(p / sqrt(T))
est_error = float(p / np.sqrt(T) * np.trace(S) / p)
return CovarianceEstimate(
method=method,
matrix=S,
condition_number=cond_num,
effective_rank=eff_rank,
estimation_error_bound=est_error,
)
def construct_portfolio(
alpha: CompositeAlpha,
cov: CovarianceEstimate,
universe: UniverseData,
target_vol: float = 0.10,
max_position: float = 0.005,
sector_neutral: bool = True,
n_rebalance_days: int = 21,
) -> Portfolio:
"""
Construct an optimal portfolio: daily weights solving a QP with
transaction cost penalty and risk constraints.
WHY THIS IS HEAVY:
- QP on n_stocks=3000 variables × ~200 constraints (sector neutrality,
position limits, net exposure) per rebalancing date
- ~120 rebalancing dates over a 10-year backtest
- Each QP: interior-point method, O(n³) per iteration, 50–200 iterations
→ 3000³ × 100 iterations = 2.7 trillion flops per QP
(in practice, sparsity reduces this, but still 5–20 min per portfolio)
- 5 covariance variants × 5–20 min = 25–100 min serial
→ 5–20 min parallel (5 Scaler workers)
The mean-variance objective with transaction costs:
max α'w - λ w'Σw - tc × Σ|w_t - w_{t-1}|
s.t. Σwᵢ = 0 (dollar-neutral), |wᵢ| ≤ max_pos,
sector exposures = 0 (if sector_neutral),
tracking error ≤ target_vol
"""
rng = np.random.default_rng(hash(cov.method) % 2**31)
n_days, n_stocks = alpha.values.shape
Sigma = cov.matrix
gamma = 2.0 # risk aversion
tc = 0.001 # one-way transaction cost (10bps)
weights_history = np.zeros((n_days, n_stocks))
turnover_series = np.zeros(n_days)
w_prev = np.zeros(n_stocks)
rebalance_dates = range(252, n_days, n_rebalance_days) # start after 1yr
for t in rebalance_dates:
mu = alpha.values[t, :]
# Replace NaN alpha with 0
mu = np.where(np.isnan(mu), 0.0, mu)
# ── Simplified QP: use closed-form mean-variance with box constraint ──
# Full QP is too slow in pure Python for 3000 vars; use ridge shrinkage
# to approximate the constrained solution.
# (Production: use Gurobi/CVXPY with sparse Sigma)
# Scale alpha by signal strength
alpha_scale = float(np.std(mu[mu != 0])) if (mu != 0).any() else 1.0
mu_scaled = mu / (alpha_scale + 1e-10)
# Diagonal approximation of Sigma for the initial solve
diag_sigma = np.diag(Sigma) + 1e-6
# Unconstrained MVO: w* ∝ Σ^{-1} μ (using diagonal approx for speed)
w_raw = mu_scaled / (gamma * diag_sigma)
# Transaction cost penalisation: shrink toward previous weights
w_tc = (w_raw + tc * w_prev) / (1 + tc)
# Box constraints: |w| ≤ max_position
w_clipped = np.clip(w_tc, -max_position, max_position)
# Dollar neutrality: subtract mean
w_clipped -= w_clipped.mean()
# Sector neutrality: subtract sector means
if sector_neutral:
sec_d = universe.sector_dummies # (n_stocks, n_sectors)
for s in range(universe.n_sectors):
mask = sec_d[:, s].astype(bool)
if mask.any():
w_clipped[mask] -= w_clipped[mask].mean()
# Volatility scaling: target_vol / sqrt(w'Σw)
port_var = float(w_clipped @ Sigma @ w_clipped)
if port_var > 1e-10:
scale = target_vol / np.sqrt(port_var * 252)
w_clipped *= min(scale, 3.0) # cap leverage at 3×
weights_history[t, :] = w_clipped
turnover_series[t] = float(np.sum(np.abs(w_clipped - w_prev)))
w_prev = w_clipped
# Fill between rebalancing dates (hold previous)
if t + n_rebalance_days < n_days:
for t2 in range(t + 1, t + n_rebalance_days):
weights_history[t2, :] = w_clipped
# Ex-ante IR
port_var_annual = float(
np.diag(weights_history[-1:] @ Sigma @ weights_history[-1:].T)[-1] * 252
)
alpha_last = alpha.values[-1, :]
expected_ret = float(
(
weights_history[-1, :]
* np.where(np.isnan(alpha_last), 0.0, alpha_last)
).sum()
)
ex_ante_ir = expected_ret / np.sqrt(port_var_annual + 1e-10)
return Portfolio(
covariance_method=cov.method,
weights_history=weights_history,
turnover_series=turnover_series,
ex_ante_ir=ex_ante_ir,
target_volatility=target_vol,
)
def walk_forward_evaluate(
portfolio: Portfolio, universe: UniverseData
) -> PerformanceStats:
"""
Compute out-of-sample performance statistics.
Fast node: pure array arithmetic.
"""
W = portfolio.weights_history # (n_days, n_stocks)
R = universe.returns # (n_days, n_stocks)
n_days = R.shape[0]
# Daily portfolio return (net of estimated transaction costs)
gross_ret = (W * R).sum(axis=1)
tc_cost = portfolio.turnover_series * 0.001 # 10bps per unit turnover
net_ret = gross_ret - tc_cost
# Skip warmup period
warmup = 252
net_eval = net_ret[warmup:]
ann_ret = float(net_eval.mean() * 252)
ann_vol = float(net_eval.std() * np.sqrt(252))
sharpe = ann_ret / (ann_vol + 1e-10)
# Information ratio vs. benchmark (equal-weight market)
bmark_ret = R[warmup:, :].mean(axis=1)
active_ret = net_eval - bmark_ret
ir = float(
active_ret.mean() * 252 / (active_ret.std() * np.sqrt(252) + 1e-10)
)
# Max drawdown
cum_ret = np.cumprod(1 + net_eval)
running_max = np.maximum.accumulate(cum_ret)
drawdowns = cum_ret / running_max - 1
max_dd = float(drawdowns.min())
avg_turnover = float(portfolio.turnover_series[warmup:].mean() * 252)
hit_rate = float((net_eval > 0).mean())
# Factor exposures: average sector weight
avg_weights = W[warmup:, :].mean(axis=0)
sec_exp = {}
for s in range(universe.n_sectors):
mask = universe.sector_dummies[:, s].astype(bool)
sec_exp[f"SECTOR_{s:02d}"] = float(avg_weights[mask].sum())
return PerformanceStats(
covariance_method=portfolio.covariance_method,
annualised_return=ann_ret,
annualised_vol=ann_vol,
sharpe_ratio=sharpe,
information_ratio=ir,
max_drawdown=max_dd,
avg_turnover=avg_turnover,
hit_rate=hit_rate,
factor_exposures=sec_exp,
)
def compare_and_report(
all_ic_stats: List[ICStats],
signal_weights: SignalWeights,
all_perf_stats: List[PerformanceStats],
composite_alpha: CompositeAlpha,
) -> AlphaResearchReport:
"""
Aggregates all 5 portfolio performance results.
Generates the research report that a PM would review.
"""
# Signal ranking table
signal_rows = []
best_by_signal: Dict[str, ICStats] = {}
for ic in all_ic_stats:
if ic.signal_name not in best_by_signal or abs(ic.icir) > abs(
best_by_signal[ic.signal_name].icir
):
best_by_signal[ic.signal_name] = ic
for name, ic in sorted(
best_by_signal.items(), key=lambda x: -abs(x[1].icir)
):
signal_rows.append(
{
"Signal": name,
"Best Horizon": f"{ic.horizon}d",
"Mean IC": f"{ic.mean_ic:.4f}",
"ICIR": f"{ic.icir:.3f}",
"t-stat": f"{ic.t_stat:.2f}",
"IC Decay (days)": f"{ic.decay_halflife:.1f}",
"Q5-Q1 (ann)": f"{ic.quantile_spread:.2%}",
"Hit Rate": f"{ic.hit_rate:.1%}",
"Weight": f"{signal_weights.weights.get(name, 0.0):+.3f}",
}
)
signal_df = pd.DataFrame(signal_rows)
# Portfolio comparison table
port_rows = [
{
"Covariance": p.covariance_method,
"Ann Return": f"{p.annualised_return:.2%}",
"Ann Vol": f"{p.annualised_vol:.2%}",
"Sharpe": f"{p.sharpe_ratio:.3f}",
"IR": f"{p.information_ratio:.3f}",
"MaxDD": f"{p.max_drawdown:.2%}",
"Turnover": f"{p.avg_turnover:.1f}×/yr",
}
for p in sorted(all_perf_stats, key=lambda x: -x.sharpe_ratio)
]
port_df = pd.DataFrame(port_rows)
best_cov = max(
all_perf_stats, key=lambda x: x.sharpe_ratio
).covariance_method
# Composite IC (using portfolio returns as proxy)
composite_ic = float(np.nanmean([ic.mean_ic for ic in all_ic_stats]))
composite_icir = composite_alpha.information_ratio_estimate
return AlphaResearchReport(
signal_rankings=signal_df,
optimal_weights=signal_weights,
portfolio_comparison=port_df,
recommended_covariance=best_cov,
composite_ic=composite_ic,
composite_icir=composite_icir,
)
def compute_all_signals_and_ics(
signal_specs: List[SignalSpec],
universe: UniverseData,
) -> Tuple[List[NeutralizedSignal], List[ICStats]]:
signals = [
compute_and_neutralize_signal(universe, spec) for spec in signal_specs
]
ic_stats = []
for sig in signals:
for h_idx in range(4): # horizons: 1d, 5d, 21d, 63d
ic_stats.append(compute_ic_and_decay(sig, horizon_idx=h_idx))
# Strip forward_returns to reduce data transfer (ICs already computed)
for sig in signals:
sig.forward_returns = None
return signals, ic_stats
def alpha_research_platform(
n_stocks: int = 1500,
n_days: int = 2520,
n_signal_variants: int = 10,
seed: int = 42,
) -> AlphaResearchReport:
"""
Full multi-signal alpha research pipeline.
PARALLEL WIN SUMMARY:
@pf.parallel — 110 signal + IC tasks (n_signal_variants × 11 base)
Sequential — optimize_signal_weights, composite alpha
Sequential — 5 covariance estimations + 5 portfolios + 5 evaluations
Sequential — compare_and_report
"""
universe = load_universe_data(n_stocks=n_stocks, n_days=n_days, seed=seed)
# Signal specifications
# Base signal specifications
base_signal_specs = [
SignalSpec(
"MOM",
"12-1 month momentum",
lookback=252,
skip=21,
decay_halflife=30,
),
SignalSpec(
"MOM6",
"6-1 month momentum",
lookback=126,
skip=21,
decay_halflife=20,
),
SignalSpec(
"STR",
"Short-term 1M reversal",
lookback=21,
skip=0,
decay_halflife=5,
),
SignalSpec(
"QRE",
"Quality: low realised vol",
lookback=252,
skip=0,
decay_halflife=60,
),
SignalSpec(
"VAL",
"Value: book-to-market",
lookback=63,
skip=0,
decay_halflife=90,
),
SignalSpec(
"EPS",
"Earnings revision breadth",
lookback=63,
skip=0,
decay_halflife=45,
),
SignalSpec(
"SHT",
"Short interest (contrarian)",
lookback=21,
skip=0,
decay_halflife=30,
),
SignalSpec(
"LOW",
"Low volatility anomaly",
lookback=126,
skip=0,
decay_halflife=45,
),
SignalSpec(
"ERN",
"Earnings surprise persistence",
lookback=63,
skip=0,
decay_halflife=21,
),
SignalSpec(
"ACC", "Accruals (quality)", lookback=252, skip=0, decay_halflife=90
),
SignalSpec(
"AGR",
"Asset growth (conservative)",
lookback=252,
skip=0,
decay_halflife=90,
),
]
# Generate lookback/decay variants (parameter sweep — standard in signal research)
signal_specs = []
for v in range(n_signal_variants):
scale = 1.0 + v * 0.5
for base in base_signal_specs:
signal_specs.append(
SignalSpec(
name=f"{base.name}_v{v}" if v > 0 else base.name,
description=base.description,
lookback=int(base.lookback * scale),
skip=base.skip,
decay_halflife=int(base.decay_halflife * scale),
)
)
# Signal + IC computation (parallelised in cell 5)
neutralized_signals, ic_stats = compute_all_signals_and_ics(
signal_specs, universe
)
# Optimize signal weights
signal_weights = optimize_signal_weights(ic_stats)
# Composite alpha (waits for 11 neutralized signals + signal weights)
composite = compute_composite_alpha(neutralized_signals, signal_weights)
# 5 covariance estimation methods
cov_methods = [
"sample",
"ledoit_wolf",
"nl_shrinkage",
"rmt_cleaned",
"factor_model",
]
covariances = [estimate_covariance(universe, method=m) for m in cov_methods]
# 5 portfolio constructions
# Each depends on: composite alpha (shared) + one covariance estimate
portfolios = [
construct_portfolio(composite, cov, universe) for cov in covariances
]
# 5 walk-forward evaluations
perf_stats = [walk_forward_evaluate(port, universe) for port in portfolios]
# Research report
return compare_and_report(ic_stats, signal_weights, perf_stats, composite)
report = alpha_research_platform(n_stocks=1500, n_days=2520, n_signal_variants=10, seed=42)
print(f"Recommended covariance: {report.recommended_covariance}")
Parfun Implementation¶
[ ]:
import os
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["GOTO_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
from scaler import Client
import parfun as pf
scheduler_address = "tcp://172.31.3.107:7788"
object_storage_address = "tcp://172.31.3.107:7789"
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
import numpy as np
import pandas as pd
import scipy.linalg
import scipy.optimize
import scipy.stats
# ─── Domain types ─────────────────────────────────────────────────────────────
@dataclass
class UniverseData:
"""
Panel of returns, fundamentals, and sector classifications.
In production: loaded from a data warehouse (Compustat, CRSP, FactSet).
"""
returns: np.ndarray # (n_days, n_stocks) daily excess returns
prices: np.ndarray # (n_days, n_stocks)
market_caps: np.ndarray # (n_days, n_stocks)
book_to_market: np.ndarray # (n_days, n_stocks)
roe: np.ndarray # (n_days, n_stocks) return on equity
asset_growth: np.ndarray # (n_days, n_stocks)
accruals: np.ndarray # (n_days, n_stocks) operating accruals/assets
eps_revisions: np.ndarray # (n_days, n_stocks) FY1 EPS revision %
short_interest: np.ndarray # (n_days, n_stocks) SI / shares outstanding
earnings_surprise: (
np.ndarray
) # (n_days, n_stocks) standardised earnings surprise
sector_dummies: np.ndarray # (n_stocks, n_sectors) one-hot
tickers: List[str]
dates: List[str]
n_days: int
n_stocks: int
n_sectors: int
@dataclass
class SignalSpec:
name: str
description: str
lookback: int # rolling window for construction (trading days)
skip: int # skip period (e.g. 1 for momentum, 0 for quality)
decay_halflife: int # exponential decay half-life for weighting
@dataclass
class NeutralizedSignal:
name: str
values: np.ndarray # (n_days, n_stocks): cross-sectionally z-scored
forward_returns: (
np.ndarray
) # (n_days, n_stocks, n_horizons): [1d,5d,21d,63d]
horizons: List[int]
@dataclass
class ICStats:
signal_name: str
horizon: int
ic_series: np.ndarray # time series of daily rank ICs
mean_ic: float
icir: float # IC / std(IC) * sqrt(252)
t_stat: float
decay_halflife: float # estimated IC decay half-life (days)
quantile_spread: float # Q5 - Q1 annualised return
hit_rate: float # fraction of days with positive IC
@dataclass
class SignalWeights:
"""Bayesian-shrunk optimal signal combination weights."""
weights: Dict[str, float] # signal_name → weight
shrinkage_intensity: float
effective_n_signals: float # diversity measure
@dataclass
class CompositeAlpha:
values: np.ndarray # (n_days, n_stocks): composite alpha forecast
component_weights: Dict[str, float]
information_ratio_estimate: float
@dataclass
class CovarianceEstimate:
method: str
matrix: np.ndarray # (n_stocks, n_stocks)
condition_number: float
effective_rank: float
estimation_error_bound: (
float # Frobenius norm of estimated estimation error
)
@dataclass
class Portfolio:
covariance_method: str
weights_history: np.ndarray # (n_days, n_stocks): daily portfolio weights
turnover_series: np.ndarray # (n_days,)
ex_ante_ir: float
target_volatility: float
@dataclass
class PerformanceStats:
covariance_method: str
annualised_return: float
annualised_vol: float
sharpe_ratio: float
information_ratio: float
max_drawdown: float
avg_turnover: float
hit_rate: float
factor_exposures: Dict[str, float]
@dataclass
class AlphaResearchReport:
signal_rankings: pd.DataFrame
optimal_weights: SignalWeights
portfolio_comparison: pd.DataFrame
recommended_covariance: str
composite_ic: float
composite_icir: float
# ─── Pipeline nodes ───────────────────────────────────────────────────────────
def load_universe_data(
n_stocks: int = 1500,
n_days: int = 1260, # 10 years of daily data
n_sectors: int = 11, # GICS sectors
seed: int = 42,
) -> UniverseData:
"""
Load the equity universe panel.
In production: read from a Parquet data lake or kdb+ tick database.
WHY THIS IS A SEPARATE NODE:
- Real data loading: 3000 stocks × 10yr daily from a data warehouse
is 5–15 minutes over typical enterprise networks
- Loading data separately means all 11 signal computations
can fire immediately once data is loaded, with zero polling overhead
- If data loading fails, the pipeline aborts cleanly without wasted compute
The synthetic data here embeds a realistic factor structure:
- True market factor (beta)
- 5 style factors with known premia
- Sector clustering
- Time-varying volatility (GARCH-like)
"""
rng = np.random.default_rng(seed)
n_factor = 8 # market + 7 style factors
# ── Factor structure ────────────────────────────────────────────────────
# True factor loadings (persistent): (n_stocks, n_factors)
B_true = rng.standard_normal((n_stocks, n_factor)) * 0.4
B_true[:, 0] = 1.0 # market beta ≈ 1
# Factor returns: market has positive drift, style factors have premia
factor_premia = (
np.array([0.05, 0.03, 0.04, 0.02, -0.02, 0.01, 0.03, 0.00]) / 252
)
factor_vol = np.array(
[0.15, 0.04, 0.05, 0.03, 0.04, 0.03, 0.04, 0.03]
) / np.sqrt(252)
F_raw = rng.standard_normal((n_days, n_factor))
factor_returns = factor_premia + F_raw * factor_vol
# Time-varying volatility: GARCH(1,1) proxy
garch_vol = np.ones(n_days)
for t in range(1, n_days):
shock = factor_returns[t - 1, 0] ** 2
garch_vol[t] = np.sqrt(
0.94 * garch_vol[t - 1] ** 2 + 0.05 * shock + 0.01 * 0.15**2 / 252
)
# Idiosyncratic returns
idio_vol = rng.uniform(0.005, 0.025, n_stocks) / np.sqrt(252)
epsilon = rng.standard_normal((n_days, n_stocks)) * idio_vol
returns = factor_returns @ B_true.T + epsilon # (n_days, n_stocks)
# ── Price history ────────────────────────────────────────────────────────
prices = 100 * np.exp(np.cumsum(returns, axis=0))
# ── Fundamental data (quarterly, interpolated daily) ─────────────────────
# Market cap: positively related to size factor (B_true[:, 2])
log_mcap = 20 + 3 * B_true[:, 2] + rng.standard_normal(n_stocks) * 1.5
market_caps = np.outer(np.ones(n_days), np.exp(log_mcap))
market_caps *= 1 + rng.standard_normal((n_days, n_stocks)) * 0.002
# B/M: value factor exposure
book_to_market = np.clip(
0.5 + B_true[:, 1] * 0.3 + rng.standard_normal(n_stocks) * 0.2,
0.05,
5.0,
)
book_to_market = np.outer(np.ones(n_days), book_to_market)
book_to_market *= 1 + rng.standard_normal((n_days, n_stocks)) * 0.01
# ROE: quality factor
roe = np.clip(
0.12 + B_true[:, 3] * 0.05 + rng.standard_normal(n_stocks) * 0.06,
-0.30,
0.60,
)
roe = np.outer(np.ones(n_days), roe)
# Asset growth, accruals, EPS revisions, short interest: synthetic signals
asset_growth = rng.standard_normal((n_days, n_stocks)) * 0.05
accruals = rng.standard_normal((n_days, n_stocks)) * 0.03
eps_revisions = rng.standard_normal((n_days, n_stocks)) * 0.04
short_interest = np.abs(rng.standard_normal((n_days, n_stocks))) * 0.03
earnings_surprise = rng.standard_normal((n_days, n_stocks)) * 0.05
# Inject signal: some factors actually predict returns
# (momentum) prices predict 1M ahead returns
for lag in [21, 63]:
past_return = np.concatenate(
[np.zeros((lag, n_stocks)), returns[:-lag]]
)
returns += 0.01 * np.sign(past_return) # weak momentum signal
# Sector membership: 11 sectors
sector_ids = rng.integers(0, n_sectors, n_stocks)
sector_dummies = np.eye(n_sectors)[sector_ids] # (n_stocks, n_sectors)
tickers = [f"STK_{i:04d}" for i in range(n_stocks)]
dates = (
pd.bdate_range("2015-01-01", periods=n_days)
.strftime("%Y-%m-%d")
.tolist()
)
return UniverseData(
returns=returns,
prices=prices,
market_caps=market_caps,
book_to_market=book_to_market,
roe=roe,
asset_growth=asset_growth,
accruals=accruals,
eps_revisions=eps_revisions,
short_interest=short_interest,
earnings_surprise=earnings_surprise,
sector_dummies=sector_dummies,
tickers=tickers,
dates=dates,
n_days=n_days,
n_stocks=n_stocks,
n_sectors=n_sectors,
)
def compute_and_neutralize_signal(
universe: UniverseData, signal_spec: SignalSpec
) -> NeutralizedSignal:
"""
Construct one raw signal from the universe panel, then cross-sectionally
neutralize vs. sector, log-market-cap (size), and market beta.
WHY THIS IS HEAVY (the most common bottleneck in signal research):
For each of the 2520 days:
1. Compute rolling signal value per stock (e.g. 252-day momentum)
→ rolling pandas operation on 3000-column DataFrame
2. Winsorize at 3σ (cross-sectional)
3. Cross-sectional OLS regression on [sector dummies, log_mcap, beta_estimate]
→ 3000 obs × (11 + 1 + 1) = 13 regressors per day
→ lstsq solve: O(n_stocks × n_regressors²) = 507k flops × 2520 days
4. Take residuals (the neutralized signal)
5. Z-score across stocks
Total: ~3.8 billion flops, dominated by the daily OLS regressions.
Wall time: 3–8 minutes per signal on one core.
11 signals × 5 min = 55 min serial → 5 min on 11-worker cluster.
Also computes forward returns at 4 horizons for downstream IC nodes.
"""
rng = np.random.default_rng(hash(signal_spec.name) % 2**31)
n_days, n_stocks = universe.n_days, universe.n_stocks
R = universe.returns
horizons = [1, 5, 21, 63]
# ── Step 1: Compute raw signal ───────────────────────────────────────────
lb = signal_spec.lookback
sk = signal_spec.skip
# Strip variant suffix (e.g. "MOM_v3" → "MOM") for signal dispatch
_base = signal_spec.name.split("_v")[0]
if _base in ("MOM", "MOM6"):
# Price momentum: cumulative return over [skip+1, lookback] days
raw = np.full((n_days, n_stocks), np.nan)
for t in range(lb + sk, n_days):
raw[t, :] = np.sum(R[t - lb - sk : t - sk, :], axis=0)
elif _base == "STR":
# Short-term reversal: past 5-day return (negative signal)
raw = np.full((n_days, n_stocks), np.nan)
for t in range(5, n_days):
raw[t, :] = -np.sum(R[t - 5 : t, :], axis=0)
elif _base in ("QRE", "LOW"):
# Quality / low-vol: rolling realised volatility (negative)
raw = np.full((n_days, n_stocks), np.nan)
for t in range(lb, n_days):
raw[t, :] = -R[t - lb : t, :].std(
axis=0
) # negative vol = low-vol signal
elif _base == "VAL":
raw = universe.book_to_market.copy()
elif _base == "EPS":
raw = universe.eps_revisions.copy()
elif _base == "SHT":
raw = -universe.short_interest.copy() # negative: high SI is bearish
elif _base == "ERN":
raw = universe.earnings_surprise.copy()
elif _base == "ACC":
raw = -universe.accruals.copy() # negative accruals = higher quality
elif _base == "AGR":
raw = (
-universe.asset_growth.copy()
) # negative growth = conservative investment
elif _base == "CAR":
# Carry: simplified as dividend yield proxy (B/M minus growth)
raw = universe.book_to_market - universe.asset_growth
else:
raw = rng.standard_normal((n_days, n_stocks))
# ── Step 2: Daily cross-sectional neutralization ────────────────────────
# Neutralize vs. sector (11 dummies), log_mcap (1), beta (1)
# Uses pre-estimated betas from a 60-day rolling market regression
log_mcap = np.log(np.maximum(universe.market_caps, 1e6))
# Rolling market beta: 60-day OLS of stock vs. market return
market_ret = universe.returns.mean(
axis=1, keepdims=True
) # equal-weight market
betas = np.ones((n_days, n_stocks))
BETA_WIN = 60
for t in range(BETA_WIN, n_days):
y_block = universe.returns[t - BETA_WIN : t, :] # (60, n_stocks)
x_block = market_ret[t - BETA_WIN : t, :] # (60, 1)
# Vectorised OLS: beta = (X'X)^{-1} X'Y per stock
xx = float((x_block**2).sum())
if xx > 1e-12:
betas[t, :] = (x_block.T @ y_block)[0] / xx
# Daily OLS neutralization: expensive inner loop
neutralized = np.full_like(raw, np.nan)
for t in range(max(lb + sk, BETA_WIN), n_days):
s = raw[t, :]
if np.all(np.isnan(s)):
continue
# Fill NaN with 0 for regression
s_clean = np.where(np.isnan(s), 0.0, s)
# Winsorize at 3σ
mu_s, sd_s = np.nanmean(s), np.nanstd(s)
if sd_s < 1e-12:
continue
s_clean = np.clip(s_clean, mu_s - 3 * sd_s, mu_s + 3 * sd_s)
# Regressors: [sector dummies, log_mcap, beta]
X = np.hstack(
[
universe.sector_dummies, # (n_stocks, 11)
log_mcap[t : t + 1, :].T, # (n_stocks, 1)
betas[t : t + 1, :].T, # (n_stocks, 1)
]
) # (n_stocks, 13)
# OLS residual
try:
result = np.linalg.lstsq(X, s_clean, rcond=None)
resid = s_clean - X @ result[0]
except np.linalg.LinAlgError:
resid = s_clean
# Z-score the residual
r_std = resid.std()
if r_std > 1e-12:
neutralized[t, :] = resid / r_std
# ── Step 3: Compute forward returns at each horizon ─────────────────────
fwd_ret = np.full((n_days, n_stocks, len(horizons)), np.nan)
for hi, h in enumerate(horizons):
for t in range(n_days - h):
fwd_ret[t, :, hi] = universe.returns[t : t + h, :].sum(axis=0)
return NeutralizedSignal(
name=signal_spec.name,
values=neutralized,
forward_returns=fwd_ret,
horizons=horizons,
)
def compute_ic_and_decay(
signal: NeutralizedSignal,
horizon_idx: int,
min_coverage: int = 252, # minimum stocks with non-NaN signal
) -> ICStats:
"""
Compute the Information Coefficient (IC) time series and decay curve for
one (signal, horizon) pair.
IC_t = Spearman rank correlation(signal_{t}, return_{t,t+h})
ICIR = mean(IC) / std(IC) × sqrt(252/h)
IC decay: regress IC_{t+lag} on IC_t across lags 0..120 to estimate the
signal's useful forecast horizon.
WHY THIS IS HEAVY:
- Spearman rank correlation: O(n_stocks × log n_stocks) per day
(sorting 3000 stocks) × 2520 days = ~38M sort operations
- Bootstrap for IC confidence intervals: 500 resamples × 2520 days
- IC decay regression: OLS across 120 lag values, each with 2400+ obs
- Total: ~5–10 minutes per (signal, horizon) pair
- 11 signals × 4 horizons = 44 nodes × 7 min = 308 min serial
→ 7 min on 44-worker cluster
The IC decay half-life is the key deliverable: it tells the PM how
frequently to rebalance and what transaction cost budget is justified.
"""
h = signal.horizons[horizon_idx]
sig_vals = signal.values # (n_days, n_stocks)
fwd_ret = signal.forward_returns[:, :, horizon_idx] # (n_days, n_stocks)
n_days = sig_vals.shape[0]
ic_series = np.full(n_days, np.nan)
for t in range(n_days - h):
s = sig_vals[t, :]
r = fwd_ret[t, :]
valid = (~np.isnan(s)) & (~np.isnan(r))
if valid.sum() < min_coverage:
continue
# Spearman rank IC
s_rank = scipy.stats.rankdata(s[valid])
r_rank = scipy.stats.rankdata(r[valid])
n_v = valid.sum()
cov_sr = np.cov(s_rank, r_rank)[0, 1]
var_s = s_rank.var()
var_r = r_rank.var()
ic_series[t] = cov_sr / np.sqrt(var_s * var_r + 1e-12)
valid_ic = ic_series[~np.isnan(ic_series)]
if len(valid_ic) < 60:
return ICStats(
signal_name=signal.name,
horizon=h,
ic_series=ic_series,
mean_ic=0.0,
icir=0.0,
t_stat=0.0,
decay_halflife=0.0,
quantile_spread=0.0,
hit_rate=0.5,
)
mean_ic = float(np.mean(valid_ic))
std_ic = float(np.std(valid_ic))
icir = mean_ic / (std_ic + 1e-12) * np.sqrt(252 / h)
t_stat = mean_ic / (std_ic / np.sqrt(len(valid_ic)) + 1e-12)
hit_rate = float((valid_ic > 0).mean())
# ── IC decay: fit exponential to autocorrelation of IC series ────────────
max_lag = min(120, len(valid_ic) // 3)
lags = np.arange(1, max_lag + 1)
ic_autocorr = np.array(
[np.corrcoef(valid_ic[:-lag], valid_ic[lag:])[0, 1] for lag in lags]
)
# Fit: autocorr(lag) ≈ exp(-lag / τ) → log-linear OLS for τ
valid_ac = ic_autocorr > 0
if valid_ac.sum() > 5:
log_ac = np.log(ic_autocorr[valid_ac] + 1e-9)
x = lags[valid_ac].astype(float)
tau = -float(x @ x) / float(x @ log_ac + 1e-9)
decay_hl = tau * np.log(2)
else:
decay_hl = float(h)
# ── Factor-quantile return spread ────────────────────────────────────────
# Bin stocks into 5 quintiles by signal, measure Q5-Q1 spread
q_spreads = []
for t in range(n_days - h):
s = sig_vals[t, :]
r = fwd_ret[t, :]
valid = (~np.isnan(s)) & (~np.isnan(r))
if valid.sum() < min_coverage:
continue
s_v, r_v = s[valid], r[valid]
q_cut = np.quantile(s_v, [0.20, 0.40, 0.60, 0.80])
q_bins = np.digitize(s_v, q_cut) # 0..4
q_means = [
r_v[q_bins == q].mean() if (q_bins == q).sum() > 10 else np.nan
for q in range(5)
]
if not any(np.isnan(q_means)):
q_spreads.append(q_means[4] - q_means[0])
quant_spread = float(np.mean(q_spreads) * 252 / h) if q_spreads else 0.0
return ICStats(
signal_name=signal.name,
horizon=h,
ic_series=ic_series,
mean_ic=mean_ic,
icir=icir,
t_stat=t_stat,
decay_halflife=decay_hl,
quantile_spread=quant_spread,
hit_rate=hit_rate,
)
def optimize_signal_weights(
all_ic_stats: List[ICStats], shrinkage_target: str = "equal"
) -> SignalWeights:
"""
Bayesian shrinkage combination of signal weights.
Blocked until all signal + IC computations complete.
IC nodes complete. The wait is mandatory — you cannot compute optimal
combination weights until you know every signal's IC and ICIR.
Method: Ledoit-Wolf shrinkage on the IC covariance matrix, followed by
mean-variance optimization in IC space.
The effective number of independent signals (after accounting for
correlations) determines the optimal portfolio of predictors.
WHY THIS MATTERS:
- Signals are correlated (momentum variants share variance)
- Naively equal-weighting ignores redundancy → over-exposure to crowded factors
- ML fitting in IC space with leave-one-out cross-validation: expensive
"""
# Aggregate to per-signal stats (max horizon h=21 for combination)
signal_names = sorted(set(ic.signal_name for ic in all_ic_stats))
ic_by_signal: Dict[str, ICStats] = {}
for sig in signal_names:
# Pick the horizon with highest ICIR as representative
candidates = [
ic
for ic in all_ic_stats
if ic.signal_name == sig and not np.isnan(ic.icir)
]
if candidates:
ic_by_signal[sig] = max(candidates, key=lambda x: abs(x.icir))
if not ic_by_signal:
n = len(signal_names)
return SignalWeights(
weights={s: 1 / n for s in signal_names},
shrinkage_intensity=1.0,
effective_n_signals=float(n),
)
# Build IC time series matrix for computing pairwise IC correlations
# Use the mean IC per signal as the summary statistic
mean_ics = np.array([ic_by_signal[s].mean_ic for s in signal_names])
icir_vals = np.array([ic_by_signal[s].icir for s in signal_names])
t_stats = np.array([ic_by_signal[s].t_stat for s in signal_names])
n_sigs = len(signal_names)
# Pairwise IC correlation: estimated from IC time series of same-length signals
# Use IC series similarity as a proxy for signal redundancy
ic_series_list = []
min_len = None
for s in signal_names:
ic_s = ic_by_signal[s].ic_series
valid_mask = ~np.isnan(ic_s)
valid_ic = ic_s[valid_mask]
ic_series_list.append(valid_ic)
if min_len is None or len(valid_ic) < min_len:
min_len = len(valid_ic)
min_len = min_len or 252
# Align IC series to same length (use last min_len observations)
ic_matrix = np.array(
[s[-min_len:] for s in ic_series_list]
) # (n_sigs, min_len)
# Sample correlation matrix of IC series
if ic_matrix.shape[1] > 5:
sample_corr = np.corrcoef(ic_matrix)
else:
sample_corr = np.eye(n_sigs)
# Ledoit-Wolf shrinkage toward identity
mu_diag = np.trace(sample_corr) / n_sigs
n_T = ic_matrix.shape[1]
# OAS shrinkage intensity
rho_num = (1 - 2 / n_sigs) * np.trace(sample_corr @ sample_corr) + np.trace(
sample_corr
) ** 2
rho_den = (n_T + 1 - 2 / n_sigs) * (
np.trace(sample_corr @ sample_corr)
- np.trace(sample_corr) ** 2 / n_sigs
)
rho = float(np.clip(rho_num / (rho_den + 1e-10), 0, 1))
shrunk_corr = (1 - rho) * sample_corr + rho * np.eye(n_sigs)
# Mean-variance in IC space: maximize ICIR^2 / variance
# Signal → expected IC (the "return"), IC correlation matrix (the "covariance")
mu_ic = np.abs(icir_vals) # use |ICIR| as expected reward per unit IC vol
sign = np.sign(mean_ics) # preserve signal direction
# Constrained optimization: max mu' w s.t. w' Σ w ≤ 1, sum(w) = 1, w ≥ 0
def neg_ic_sharpe(w):
portfolio_ic = float(mu_ic @ w)
portfolio_var = float(w @ shrunk_corr @ w)
return -(portfolio_ic / np.sqrt(portfolio_var + 1e-10))
def jac_neg(w):
n = len(w)
grad = np.zeros(n)
portfolio_ic = float(mu_ic @ w)
portfolio_var = float(w @ shrunk_corr @ w)
denom = np.sqrt(portfolio_var + 1e-10)
grad = -(mu_ic * denom - portfolio_ic * (shrunk_corr @ w) / denom) / (
portfolio_var + 1e-10
)
return grad
n_sigs_local = n_sigs
bounds_w = [(0.0, 1.0)] * n_sigs_local
constraints = [{"type": "eq", "fun": lambda w: np.sum(w) - 1.0}]
w0 = np.ones(n_sigs_local) / n_sigs_local
# Multiple restarts
best_w, best_val = w0.copy(), np.inf
for _ in range(20):
rng_local = np.random.default_rng()
w_init = rng_local.dirichlet(np.ones(n_sigs_local))
try:
result = scipy.optimize.minimize(
neg_ic_sharpe,
w_init,
jac=jac_neg,
method="SLSQP",
bounds=bounds_w,
constraints=constraints,
options={"maxiter": 300, "ftol": 1e-10},
)
if result.fun < best_val:
best_val = result.fun
best_w = result.x
except Exception:
continue
# Apply sign (long/short signal direction) and normalise
w_signed = best_w * sign
w_signed = w_signed / (np.abs(w_signed).sum() + 1e-12)
# Effective number of signals: entropy-based
w_abs_norm = np.abs(best_w) / (np.abs(best_w).sum() + 1e-12)
eff_n = float(np.exp(-np.sum(w_abs_norm * np.log(w_abs_norm + 1e-12))))
return SignalWeights(
weights={s: float(w_signed[i]) for i, s in enumerate(signal_names)},
shrinkage_intensity=rho,
effective_n_signals=eff_n,
)
def compute_composite_alpha(
all_neutralized: List[NeutralizedSignal], weights: SignalWeights
) -> CompositeAlpha:
"""
Combine neutralized signal values using the optimised weights.
Applies an exponential decay to give more weight to recent observations.
Fast node: pure linear combination.
Waits for: all 11 neutralized signal nodes + optimize_signal_weights.
"""
signal_map = {s.name: s.values for s in all_neutralized}
n_days, n_stocks = list(signal_map.values())[0].shape
composite = np.zeros((n_days, n_stocks))
for name, w in weights.weights.items():
if name in signal_map and abs(w) > 1e-6:
vals = signal_map[name]
# Replace NaN with 0 (neutral) for combination
composite += w * np.where(np.isnan(vals), 0.0, vals)
# Information ratio estimate (upper bound):
# IR ≈ IC × sqrt(N_signals × breadth)
valid_ics = [v for v in weights.weights.values() if abs(v) > 1e-6]
ir_estimate = 0.03 * np.sqrt(len(valid_ics) * 252) # rough Grinold rule
return CompositeAlpha(
values=composite,
component_weights=weights.weights,
information_ratio_estimate=float(ir_estimate),
)
def estimate_covariance(
universe: UniverseData,
method: str,
lookback: int = 504,
n_factors: int = 15,
) -> CovarianceEstimate:
"""
Estimate the return covariance matrix using one of five methods.
All five run sequentially — each is genuinely expensive at n_stocks=3000.
WHY THIS IS HEAVY (the critical bottleneck in large-universe quant PMs):
"sample": O(p² × T) to compute + O(p³) to invert (Cholesky)
p=3000, T=504 → 4.5B + 27B flops ≈ 6 min
"ledoit_wolf": Same as sample + iterative shrinkage estimation
"nl_shrinkage": Ledoit-Wolf (2020) nonlinear: O(p³) eigendecomposition
+ numerical integration per eigenvalue → 8–15 min
"rmt_cleaned": Random Matrix Theory eigenvalue clipping:
O(p³) SVD + Marchenko-Pastur fitting → 8–12 min
"factor_model": PCA factor extraction + OLS specific risk + assembly
O(min(p,T)² × max(p,T)) SVD ≈ 4 min for T<p
The bottleneck is nl_shrinkage.
Serial would be 6+6+12+10+4 = 38 min; parallel = 12 min.
"""
rng = np.random.default_rng(hash(method) % 2**31)
R = universe.returns[-lookback:, :] # (lookback, n_stocks)
T, p = R.shape
R_dm = R - R.mean(axis=0)
if method == "sample":
S = R_dm.T @ R_dm / (T - 1) # (p, p) raw sample covariance
# Add small regularisation for invertibility
S += np.eye(p) * 1e-6
elif method == "ledoit_wolf":
S_raw = R_dm.T @ R_dm / (T - 1)
# OAS shrinkage intensity
mu = np.trace(S_raw) / p
delta = float(np.trace(S_raw @ S_raw))
gamma = float(np.trace(S_raw) ** 2)
rho = ((1 - 2 / p) * delta + gamma) / (
(T + 1 - 2 / p) * (delta - gamma / p)
)
rho = float(np.clip(rho, 0, 1))
S = (1 - rho) * S_raw + rho * mu * np.eye(p)
elif method == "nl_shrinkage":
# Ledoit-Wolf (2020) Oracle Approximating Shrinkage (OAS) for large p
# Exact formula using eigenvalue distribution
S_raw = R_dm.T @ R_dm / (T - 1)
eigvals, eigvecs = np.linalg.eigh(S_raw)
eigvals = np.maximum(eigvals, 1e-8)
# Stein's estimator: d_i^* = d_i / (1 + (p/T) × h(d_i))
# where h is a function of the empirical spectral distribution
c = p / T # concentration ratio
# Marchenko-Pastur Stieltjes transform approximation
d_star = np.zeros(p)
for i, d in enumerate(eigvals):
# Simplified NL shrinkage: Haff (1980) approximation
d_star[i] = d / (1 + c * (1 - d / (eigvals.mean() + 1e-10)))
d_star = np.maximum(d_star, eigvals * 0.1) # floor
S = (eigvecs * d_star) @ eigvecs.T
elif method == "rmt_cleaned":
# Random Matrix Theory: remove eigenvalues below Marchenko-Pastur upper edge
S_raw = R_dm.T @ R_dm / (T - 1)
sigma2 = np.trace(S_raw) / p
c = p / T
# Marchenko-Pastur upper/lower edges
lambda_plus = sigma2 * (1 + np.sqrt(c)) ** 2
lambda_minus = sigma2 * (1 - np.sqrt(c)) ** 2
eigvals, eigvecs = np.linalg.eigh(S_raw)
# Eigenvalues below lambda_plus are noise → shrink to mean noise level
noise_mask = eigvals < lambda_plus
noise_mean = (
float(eigvals[noise_mask].mean()) if noise_mask.any() else sigma2
)
d_clean = np.where(noise_mask, noise_mean, eigvals)
S = (eigvecs * d_clean) @ eigvecs.T
elif method == "factor_model":
# PCA factor model: S = B Σ_F B' + diag(σ²_ε)
# This is the fastest for large p when n_factors << p
# Full SVD for factor extraction (EXPENSIVE at p=3000)
U, sv, Vt = np.linalg.svd(R_dm, full_matrices=False)
F = U[:, :n_factors] * sv[:n_factors] # (T, n_factors)
B = Vt[:n_factors, :].T # (p, n_factors)
Sigma_F = np.cov(F.T) # (n_factors, n_factors)
residuals = R_dm - F @ B.T # (T, p)
sigma2_eps = residuals.var(axis=0) # (p,): specific variance
S = B @ Sigma_F @ B.T + np.diag(sigma2_eps)
else:
S = np.eye(p)
# Diagnostics
try:
eigvals_check = np.linalg.eigvalsh(S)
cond_num = float(eigvals_check[-1] / max(eigvals_check[0], 1e-12))
eff_rank = float(
np.exp(scipy.stats.entropy(eigvals_check / eigvals_check.sum()))
)
except Exception:
cond_num, eff_rank = 1e6, 1.0
# Estimation error bound (Frobenius): O(p / sqrt(T))
est_error = float(p / np.sqrt(T) * np.trace(S) / p)
return CovarianceEstimate(
method=method,
matrix=S,
condition_number=cond_num,
effective_rank=eff_rank,
estimation_error_bound=est_error,
)
def construct_portfolio(
alpha: CompositeAlpha,
cov: CovarianceEstimate,
universe: UniverseData,
target_vol: float = 0.10,
max_position: float = 0.005,
sector_neutral: bool = True,
n_rebalance_days: int = 21,
) -> Portfolio:
"""
Construct an optimal portfolio: daily weights solving a QP with
transaction cost penalty and risk constraints.
WHY THIS IS HEAVY:
- QP on n_stocks=3000 variables × ~200 constraints (sector neutrality,
position limits, net exposure) per rebalancing date
- ~120 rebalancing dates over a 10-year backtest
- Each QP: interior-point method, O(n³) per iteration, 50–200 iterations
→ 3000³ × 100 iterations = 2.7 trillion flops per QP
(in practice, sparsity reduces this, but still 5–20 min per portfolio)
- 5 covariance variants × 5–20 min = 25–100 min serial
→ 5–20 min parallel (5 Scaler workers)
The mean-variance objective with transaction costs:
max α'w - λ w'Σw - tc × Σ|w_t - w_{t-1}|
s.t. Σwᵢ = 0 (dollar-neutral), |wᵢ| ≤ max_pos,
sector exposures = 0 (if sector_neutral),
tracking error ≤ target_vol
"""
rng = np.random.default_rng(hash(cov.method) % 2**31)
n_days, n_stocks = alpha.values.shape
Sigma = cov.matrix
gamma = 2.0 # risk aversion
tc = 0.001 # one-way transaction cost (10bps)
weights_history = np.zeros((n_days, n_stocks))
turnover_series = np.zeros(n_days)
w_prev = np.zeros(n_stocks)
rebalance_dates = range(252, n_days, n_rebalance_days) # start after 1yr
for t in rebalance_dates:
mu = alpha.values[t, :]
# Replace NaN alpha with 0
mu = np.where(np.isnan(mu), 0.0, mu)
# ── Simplified QP: use closed-form mean-variance with box constraint ──
# Full QP is too slow in pure Python for 3000 vars; use ridge shrinkage
# to approximate the constrained solution.
# (Production: use Gurobi/CVXPY with sparse Sigma)
# Scale alpha by signal strength
alpha_scale = float(np.std(mu[mu != 0])) if (mu != 0).any() else 1.0
mu_scaled = mu / (alpha_scale + 1e-10)
# Diagonal approximation of Sigma for the initial solve
diag_sigma = np.diag(Sigma) + 1e-6
# Unconstrained MVO: w* ∝ Σ^{-1} μ (using diagonal approx for speed)
w_raw = mu_scaled / (gamma * diag_sigma)
# Transaction cost penalisation: shrink toward previous weights
w_tc = (w_raw + tc * w_prev) / (1 + tc)
# Box constraints: |w| ≤ max_position
w_clipped = np.clip(w_tc, -max_position, max_position)
# Dollar neutrality: subtract mean
w_clipped -= w_clipped.mean()
# Sector neutrality: subtract sector means
if sector_neutral:
sec_d = universe.sector_dummies # (n_stocks, n_sectors)
for s in range(universe.n_sectors):
mask = sec_d[:, s].astype(bool)
if mask.any():
w_clipped[mask] -= w_clipped[mask].mean()
# Volatility scaling: target_vol / sqrt(w'Σw)
port_var = float(w_clipped @ Sigma @ w_clipped)
if port_var > 1e-10:
scale = target_vol / np.sqrt(port_var * 252)
w_clipped *= min(scale, 3.0) # cap leverage at 3×
weights_history[t, :] = w_clipped
turnover_series[t] = float(np.sum(np.abs(w_clipped - w_prev)))
w_prev = w_clipped
# Fill between rebalancing dates (hold previous)
if t + n_rebalance_days < n_days:
for t2 in range(t + 1, t + n_rebalance_days):
weights_history[t2, :] = w_clipped
# Ex-ante IR
port_var_annual = float(
np.diag(weights_history[-1:] @ Sigma @ weights_history[-1:].T)[-1] * 252
)
alpha_last = alpha.values[-1, :]
expected_ret = float(
(
weights_history[-1, :]
* np.where(np.isnan(alpha_last), 0.0, alpha_last)
).sum()
)
ex_ante_ir = expected_ret / np.sqrt(port_var_annual + 1e-10)
return Portfolio(
covariance_method=cov.method,
weights_history=weights_history,
turnover_series=turnover_series,
ex_ante_ir=ex_ante_ir,
target_volatility=target_vol,
)
def walk_forward_evaluate(
portfolio: Portfolio, universe: UniverseData
) -> PerformanceStats:
"""
Compute out-of-sample performance statistics.
Fast node: pure array arithmetic.
"""
W = portfolio.weights_history # (n_days, n_stocks)
R = universe.returns # (n_days, n_stocks)
n_days = R.shape[0]
# Daily portfolio return (net of estimated transaction costs)
gross_ret = (W * R).sum(axis=1)
tc_cost = portfolio.turnover_series * 0.001 # 10bps per unit turnover
net_ret = gross_ret - tc_cost
# Skip warmup period
warmup = 252
net_eval = net_ret[warmup:]
ann_ret = float(net_eval.mean() * 252)
ann_vol = float(net_eval.std() * np.sqrt(252))
sharpe = ann_ret / (ann_vol + 1e-10)
# Information ratio vs. benchmark (equal-weight market)
bmark_ret = R[warmup:, :].mean(axis=1)
active_ret = net_eval - bmark_ret
ir = float(
active_ret.mean() * 252 / (active_ret.std() * np.sqrt(252) + 1e-10)
)
# Max drawdown
cum_ret = np.cumprod(1 + net_eval)
running_max = np.maximum.accumulate(cum_ret)
drawdowns = cum_ret / running_max - 1
max_dd = float(drawdowns.min())
avg_turnover = float(portfolio.turnover_series[warmup:].mean() * 252)
hit_rate = float((net_eval > 0).mean())
# Factor exposures: average sector weight
avg_weights = W[warmup:, :].mean(axis=0)
sec_exp = {}
for s in range(universe.n_sectors):
mask = universe.sector_dummies[:, s].astype(bool)
sec_exp[f"SECTOR_{s:02d}"] = float(avg_weights[mask].sum())
return PerformanceStats(
covariance_method=portfolio.covariance_method,
annualised_return=ann_ret,
annualised_vol=ann_vol,
sharpe_ratio=sharpe,
information_ratio=ir,
max_drawdown=max_dd,
avg_turnover=avg_turnover,
hit_rate=hit_rate,
factor_exposures=sec_exp,
)
def compare_and_report(
all_ic_stats: List[ICStats],
signal_weights: SignalWeights,
all_perf_stats: List[PerformanceStats],
composite_alpha: CompositeAlpha,
) -> AlphaResearchReport:
"""
Aggregates all 5 portfolio performance results.
Generates the research report that a PM would review.
"""
# Signal ranking table
signal_rows = []
best_by_signal: Dict[str, ICStats] = {}
for ic in all_ic_stats:
if ic.signal_name not in best_by_signal or abs(ic.icir) > abs(
best_by_signal[ic.signal_name].icir
):
best_by_signal[ic.signal_name] = ic
for name, ic in sorted(
best_by_signal.items(), key=lambda x: -abs(x[1].icir)
):
signal_rows.append(
{
"Signal": name,
"Best Horizon": f"{ic.horizon}d",
"Mean IC": f"{ic.mean_ic:.4f}",
"ICIR": f"{ic.icir:.3f}",
"t-stat": f"{ic.t_stat:.2f}",
"IC Decay (days)": f"{ic.decay_halflife:.1f}",
"Q5-Q1 (ann)": f"{ic.quantile_spread:.2%}",
"Hit Rate": f"{ic.hit_rate:.1%}",
"Weight": f"{signal_weights.weights.get(name, 0.0):+.3f}",
}
)
signal_df = pd.DataFrame(signal_rows)
# Portfolio comparison table
port_rows = [
{
"Covariance": p.covariance_method,
"Ann Return": f"{p.annualised_return:.2%}",
"Ann Vol": f"{p.annualised_vol:.2%}",
"Sharpe": f"{p.sharpe_ratio:.3f}",
"IR": f"{p.information_ratio:.3f}",
"MaxDD": f"{p.max_drawdown:.2%}",
"Turnover": f"{p.avg_turnover:.1f}×/yr",
}
for p in sorted(all_perf_stats, key=lambda x: -x.sharpe_ratio)
]
port_df = pd.DataFrame(port_rows)
best_cov = max(
all_perf_stats, key=lambda x: x.sharpe_ratio
).covariance_method
# Composite IC (using portfolio returns as proxy)
composite_ic = float(np.nanmean([ic.mean_ic for ic in all_ic_stats]))
composite_icir = composite_alpha.information_ratio_estimate
return AlphaResearchReport(
signal_rankings=signal_df,
optimal_weights=signal_weights,
portfolio_comparison=port_df,
recommended_covariance=best_cov,
composite_ic=composite_ic,
composite_icir=composite_icir,
)
def _combine_signal_ic_results(results):
all_signals = []
all_ics = []
for chunk_signals, chunk_ics in results:
all_signals.extend(chunk_signals)
all_ics.extend(chunk_ics)
return all_signals, all_ics
@pf.parallel(
split=pf.per_argument(signal_specs=pf.py_list.by_chunk),
combine_with=_combine_signal_ic_results,
fixed_partition_size=1,
)
def compute_all_signals_and_ics(
signal_specs: List[SignalSpec],
universe: UniverseData,
) -> Tuple[List[NeutralizedSignal], List[ICStats]]:
signals = [
compute_and_neutralize_signal(universe, spec) for spec in signal_specs
]
ic_stats = []
for sig in signals:
for h_idx in range(4): # horizons: 1d, 5d, 21d, 63d
ic_stats.append(compute_ic_and_decay(sig, horizon_idx=h_idx))
# Strip forward_returns to reduce data transfer (ICs already computed)
for sig in signals:
sig.forward_returns = None
return signals, ic_stats
def alpha_research_platform(
n_stocks: int = 1500,
n_days: int = 2520,
n_signal_variants: int = 10,
seed: int = 42,
) -> AlphaResearchReport:
"""
Full multi-signal alpha research pipeline.
PARALLEL WIN SUMMARY:
@pf.parallel — 110 signal + IC tasks (n_signal_variants × 11 base)
Sequential — optimize_signal_weights, composite alpha
Sequential — 5 covariance estimations + 5 portfolios + 5 evaluations
Sequential — compare_and_report
"""
universe = load_universe_data(n_stocks=n_stocks, n_days=n_days, seed=seed)
# Signal specifications
# Base signal specifications
base_signal_specs = [
SignalSpec(
"MOM",
"12-1 month momentum",
lookback=252,
skip=21,
decay_halflife=30,
),
SignalSpec(
"MOM6",
"6-1 month momentum",
lookback=126,
skip=21,
decay_halflife=20,
),
SignalSpec(
"STR",
"Short-term 1M reversal",
lookback=21,
skip=0,
decay_halflife=5,
),
SignalSpec(
"QRE",
"Quality: low realised vol",
lookback=252,
skip=0,
decay_halflife=60,
),
SignalSpec(
"VAL",
"Value: book-to-market",
lookback=63,
skip=0,
decay_halflife=90,
),
SignalSpec(
"EPS",
"Earnings revision breadth",
lookback=63,
skip=0,
decay_halflife=45,
),
SignalSpec(
"SHT",
"Short interest (contrarian)",
lookback=21,
skip=0,
decay_halflife=30,
),
SignalSpec(
"LOW",
"Low volatility anomaly",
lookback=126,
skip=0,
decay_halflife=45,
),
SignalSpec(
"ERN",
"Earnings surprise persistence",
lookback=63,
skip=0,
decay_halflife=21,
),
SignalSpec(
"ACC", "Accruals (quality)", lookback=252, skip=0, decay_halflife=90
),
SignalSpec(
"AGR",
"Asset growth (conservative)",
lookback=252,
skip=0,
decay_halflife=90,
),
]
# Generate lookback/decay variants (parameter sweep — standard in signal research)
signal_specs = []
for v in range(n_signal_variants):
scale = 1.0 + v * 0.5
for base in base_signal_specs:
signal_specs.append(
SignalSpec(
name=f"{base.name}_v{v}" if v > 0 else base.name,
description=base.description,
lookback=int(base.lookback * scale),
skip=base.skip,
decay_halflife=int(base.decay_halflife * scale),
)
)
# Signal + IC computation (parallelised in cell 5)
with pf.set_parallel_backend_context(
"scaler_remote",
scheduler_address=scheduler_address,
object_storage_address=object_storage_address,
n_workers=128,
):
neutralized_signals, ic_stats = compute_all_signals_and_ics(
signal_specs, universe
)
# Optimize signal weights
signal_weights = optimize_signal_weights(ic_stats)
# Composite alpha (waits for 11 neutralized signals + signal weights)
composite = compute_composite_alpha(neutralized_signals, signal_weights)
# 5 covariance estimation methods
cov_methods = [
"sample",
"ledoit_wolf",
"nl_shrinkage",
"rmt_cleaned",
"factor_model",
]
covariances = [estimate_covariance(universe, method=m) for m in cov_methods]
# 5 portfolio constructions
# Each depends on: composite alpha (shared) + one covariance estimate
portfolios = [
construct_portfolio(composite, cov, universe) for cov in covariances
]
# 5 walk-forward evaluations
perf_stats = [walk_forward_evaluate(port, universe) for port in portfolios]
# Research report
return compare_and_report(ic_stats, signal_weights, perf_stats, composite)
report = alpha_research_platform(n_stocks=1500, n_days=2520, n_signal_variants=10, seed=42)
print(f"Recommended covariance: {report.recommended_covariance}")
Diff: Native vs Parfun¶
[1]:
import difflib
import json
import re
from IPython.display import HTML, display
with open("AlphaResearch.ipynb") as _f:
_nb = json.load(_f)
native_code = "".join(_nb["cells"][3]["source"])
parallel_code = "".join(_nb["cells"][5]["source"])
differ = difflib.HtmlDiff(wrapcolumn=90)
table = differ.make_table(
native_code.splitlines(),
parallel_code.splitlines(),
fromdesc="Native Python",
todesc="With Parfun",
context=True,
numlines=10,
)
# Strip unwanted columns: navigation links, line numbers, and nowrap attribute
table = re.sub(r'<td class="diff_next"[^>]*>.*?</td>', "", table)
table = re.sub(r'<th class="diff_next"[^>]*>.*?</th>', "", table)
table = re.sub(r'<td class="diff_header"[^>]*>.*?</td>', "", table)
table = table.replace('colspan="2"', "")
table = table.replace(' nowrap="nowrap"', "")
table = re.sub(
r"(\s*<colgroup></colgroup>){6}",
"\n <colgroup></colgroup> <colgroup></colgroup>",
table,
)
css = """<style>
table.diff { font-family: monospace; font-size: 10px; border-collapse: collapse; width: 100%; }
table.diff td { padding: 2px 8px; white-space: pre-wrap; word-break: break-all; text-align: left !important; }
table.diff th { padding: 6px 8px; text-align: center; background-color: #f0f0f0; font-size: 14px; }
td.diff_add { background-color: #e6ffec; }
td.diff_chg { background-color: #fff8c5; }
td.diff_sub { background-color: #ffebe9; }
span.diff_add { background-color: #ccffd8; }
span.diff_chg { background-color: #fff2a8; }
span.diff_sub { background-color: #ffc0c0; }
</style>"""
display(HTML(css + table))
| Native Python | With Parfun |
|---|---|
| import os | import os |
| os.environ["OPENBLAS_NUM_THREADS"] = "1" | os.environ["OPENBLAS_NUM_THREADS"] = "1" |
| os.environ["GOTO_NUM_THREADS"] = "1" | os.environ["GOTO_NUM_THREADS"] = "1" |
| os.environ["OMP_NUM_THREADS"] = "1" | os.environ["OMP_NUM_THREADS"] = "1" |
| os.environ["MKL_NUM_THREADS"] = "1" | os.environ["MKL_NUM_THREADS"] = "1" |
| from scaler import Client | |
| import parfun as pf | |
| scheduler_address = "tcp://172.31.3.107:7788" | |
| object_storage_address = "tcp://172.31.3.107:7789" | |
| from dataclasses import dataclass, field | from dataclasses import dataclass, field |
| from typing import Dict, List, Optional, Tuple | from typing import Dict, List, Optional, Tuple |
| import numpy as np | import numpy as np |
| import pandas as pd | import pandas as pd |
| import scipy.linalg | import scipy.linalg |
| import scipy.optimize | import scipy.optimize |
| import scipy.stats | import scipy.stats |
| # ─── Domain types ───────────────────────────────────────────────────────────── | # ─── Domain types ───────────────────────────────────────────────────────────── |
| return AlphaResearchReport( | return AlphaResearchReport( |
| signal_rankings=signal_df, | signal_rankings=signal_df, |
| optimal_weights=signal_weights, | optimal_weights=signal_weights, |
| portfolio_comparison=port_df, | portfolio_comparison=port_df, |
| recommended_covariance=best_cov, | recommended_covariance=best_cov, |
| composite_ic=composite_ic, | composite_ic=composite_ic, |
| composite_icir=composite_icir, | composite_icir=composite_icir, |
| ) | ) |
| def _combine_signal_ic_results(results): | |
| all_signals = [] | |
| all_ics = [] | |
| for chunk_signals, chunk_ics in results: | |
| all_signals.extend(chunk_signals) | |
| all_ics.extend(chunk_ics) | |
| return all_signals, all_ics | |
| @pf.parallel( | |
| split=pf.per_argument(signal_specs=pf.py_list.by_chunk), | |
| combine_with=_combine_signal_ic_results, | |
| fixed_partition_size=1, | |
| ) | |
| def compute_all_signals_and_ics( | def compute_all_signals_and_ics( |
| signal_specs: List[SignalSpec], | signal_specs: List[SignalSpec], |
| universe: UniverseData, | universe: UniverseData, |
| ) -> Tuple[List[NeutralizedSignal], List[ICStats]]: | ) -> Tuple[List[NeutralizedSignal], List[ICStats]]: |
| signals = [ | signals = [ |
| compute_and_neutralize_signal(universe, spec) for spec in signal_specs | compute_and_neutralize_signal(universe, spec) for spec in signal_specs |
| ] | ] |
| ic_stats = [] | ic_stats = [] |
| for sig in signals: | for sig in signals: |
| for h_idx in range(4): # horizons: 1d, 5d, 21d, 63d | for h_idx in range(4): # horizons: 1d, 5d, 21d, 63d |
| name=f"{base.name}_v{v}" if v > 0 else base.name, | name=f"{base.name}_v{v}" if v > 0 else base.name, |
| description=base.description, | description=base.description, |
| lookback=int(base.lookback * scale), | lookback=int(base.lookback * scale), |
| skip=base.skip, | skip=base.skip, |
| decay_halflife=int(base.decay_halflife * scale), | decay_halflife=int(base.decay_halflife * scale), |
| ) | ) |
| ) | ) |
| # Signal + IC computation (parallelised in cell 5) | # Signal + IC computation (parallelised in cell 5) |
| neutralized_signals, ic_stats = compute_all_signals_and_ics( | with pf.set_parallel_backend_context( |
| signal_specs, universe | "scaler_remote", |
| scheduler_address=scheduler_address, | |
| object_storage_address=object_storage_address, | |
| n_workers=128, | |
| ) | ): |
| neutralized_signals, ic_stats = compute_all_signals_and_ics( | |
| signal_specs, universe | |
| ) | |
| # Optimize signal weights | # Optimize signal weights |
| signal_weights = optimize_signal_weights(ic_stats) | signal_weights = optimize_signal_weights(ic_stats) |
| # Composite alpha (waits for 11 neutralized signals + signal weights) | # Composite alpha (waits for 11 neutralized signals + signal weights) |
| composite = compute_composite_alpha(neutralized_signals, signal_weights) | composite = compute_composite_alpha(neutralized_signals, signal_weights) |
| # 5 covariance estimation methods | # 5 covariance estimation methods |
| cov_methods = [ | cov_methods = [ |
| "sample", | "sample", |
Statistics¶
Parfun |
Pargraph |
Sequential Runtime |
Parallel Runtime |
Workers |
Speedup |
Efficiency |
|---|---|---|---|---|---|---|
Yes |
No |
14m38.462s |
5m49.054s |
8 |
2.52x |
0.31 |