Portfolio-Level XVA Risk Computation with Pargraph¶
A bank’s XVA desk computes Credit Valuation Adjustment (CVA) and Funding Valuation Adjustment (FVA) across a multi-asset portfolio spanning interest rates, FX, equities, and credit. The CVA for a netting set is:
The pipeline forms a 6-level DAG with ~80 compute nodes:
Level 1 — Calibrate stochastic models for 15 asset sub-classes (IR×5, FX×4, EQ×3, Credit×3)
Level 2 — Monte Carlo path simulation for each calibrated model (15 parallel)
Level 3 — Portfolio pricing per desk/asset class (15 parallel)
Level 4 — XVA (CVA+FVA) per netting set / counterparty (15 parallel)
Level 5 — Sensitivity/Greeks per netting set via bump-and-reprice (15 parallel)
Level 6 — Aggregate final XVA risk report (1 node)
Pargraph schedules the pipeline as a DAG: at each level, all 15 independent nodes execute in parallel across Scaler workers. The critical path is only 6 levels deep regardless of the number of models, desks, or netting sets.
Program Structure¶
Native Python Implementation¶
[ ]:
import os
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["GOTO_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
from dataclasses import dataclass, field
from typing import Dict, List, Tuple
import numpy as np
# ---------------------------------------------------------------------------
# Data classes — small, serializable results passed between DAG nodes
# ---------------------------------------------------------------------------
@dataclass
class ModelParams:
"""Calibrated model parameters. Tiny payload (few floats)."""
name: str
mean_reversion: float
volatility: float
long_term_mean: float
correlation_to_rates: float
calibration_error: float
@dataclass
class SimulationResult:
"""Summary statistics from Monte Carlo simulation. Small payload."""
asset_class: str
n_paths: int
n_timesteps: int
mean_path: np.ndarray # shape (n_timesteps,)
percentile_97_5: np.ndarray
percentile_2_5: np.ndarray
terminal_distribution_moments: Tuple[float, float, float, float]
@dataclass
class ExposureProfile:
"""Portfolio exposure profile for one desk/asset class. Small payload."""
asset_class: str
n_trades: int
expected_exposure: np.ndarray # shape (n_timesteps,)
expected_positive_exposure: np.ndarray
expected_negative_exposure: np.ndarray
peak_exposure: float
@dataclass
class XVAResult:
"""XVA metrics for a single netting set."""
netting_set: str
cva: float
fva: float
term_structure_cva: np.ndarray
term_structure_fva: np.ndarray
@dataclass
class SensitivityResult:
"""Greeks/sensitivities for one netting set."""
netting_set: str
delta_ir: float
delta_credit: float
delta_fx: float
delta_equity: float
gamma_ir: float
vega: float
@dataclass
class XVARiskReport:
"""Final aggregated report. Small payload."""
total_cva: float
total_fva: float
total_xva: float
netting_set_breakdown: Dict[str, Dict[str, float]]
cva_sensitivities: Dict[str, float]
fva_sensitivities: Dict[str, float]
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
@dataclass
class ComputeConfig:
"""Controls how heavy each computation step is."""
n_paths: int
n_timesteps: int
n_trades: int # trades per desk
n_calibration_iterations: int
bump_size: float = 0.0001
TEST_CONFIG = ComputeConfig(
n_paths=10_000,
n_timesteps=60,
n_trades=100,
n_calibration_iterations=5_000,
)
MEDIUM_CONFIG = ComputeConfig(
n_paths=100_000,
n_timesteps=180,
n_trades=300,
n_calibration_iterations=15_000,
)
FULL_CONFIG = ComputeConfig(
n_paths=500_000,
n_timesteps=360,
n_trades=2_000,
n_calibration_iterations=50_000,
)
# ---------------------------------------------------------------------------
# Asset class definitions — 15 models to calibrate
# ---------------------------------------------------------------------------
# (name, seed, param_ranges, correlation_to_rates)
MODELS = [
# 5 IR curves (different currencies)
("IR_USD", 100, {"mr": (0.01, 0.5), "vol": (0.005, 0.02), "mean": (0.02, 0.05)}, 1.0),
("IR_EUR", 101, {"mr": (0.01, 0.5), "vol": (0.003, 0.015), "mean": (0.01, 0.04)}, 0.9),
("IR_GBP", 102, {"mr": (0.01, 0.5), "vol": (0.004, 0.018), "mean": (0.015, 0.045)}, 0.85),
("IR_JPY", 103, {"mr": (0.01, 0.5), "vol": (0.001, 0.008), "mean": (-0.005, 0.015)}, 0.7),
("IR_CHF", 104, {"mr": (0.01, 0.5), "vol": (0.002, 0.012), "mean": (-0.003, 0.02)}, 0.75),
# 4 FX pairs
("FX_EURUSD", 200, {"mr": (0.0, 0.3), "vol": (0.05, 0.15), "mean": (1.0, 1.2)}, 0.3),
("FX_GBPUSD", 201, {"mr": (0.0, 0.3), "vol": (0.06, 0.18), "mean": (1.2, 1.4)}, 0.25),
("FX_USDJPY", 202, {"mr": (0.0, 0.3), "vol": (0.04, 0.12), "mean": (100, 150)}, 0.2),
("FX_USDCHF", 203, {"mr": (0.0, 0.3), "vol": (0.05, 0.14), "mean": (0.85, 1.0)}, 0.22),
# 3 equity indices
("EQ_SPX", 300, {"mr": (0.5, 5.0), "vol": (0.1, 0.5), "mean": (0.02, 0.08)}, -0.2),
("EQ_EUROSTOXX", 301, {"mr": (0.5, 5.0), "vol": (0.12, 0.6), "mean": (0.02, 0.10)}, -0.25),
("EQ_NIKKEI", 302, {"mr": (0.5, 5.0), "vol": (0.10, 0.45), "mean": (0.01, 0.07)}, -0.15),
# 3 credit sectors
("CR_IG", 400, {"mr": (0.01, 1.0), "vol": (0.005, 0.03), "mean": (0.003, 0.015)}, -0.15),
("CR_HY", 401, {"mr": (0.01, 1.0), "vol": (0.01, 0.06), "mean": (0.01, 0.05)}, -0.25),
("CR_EM", 402, {"mr": (0.01, 1.0), "vol": (0.008, 0.05), "mean": (0.008, 0.04)}, -0.20),
]
# 15 desks for pricing (one per model)
DESK_NAMES = [m[0] for m in MODELS]
# 15 netting sets for XVA
NETTING_SETS = [
"NS_USBank1", "NS_USBank2", "NS_USBank3",
"NS_EUBank1", "NS_EUBank2", "NS_EUBank3",
"NS_UKBank1", "NS_UKBank2",
"NS_JPBank1", "NS_JPBank2",
"NS_Hedge1", "NS_Hedge2", "NS_Hedge3",
"NS_Corp1", "NS_Corp2",
]
# Map netting sets to credit sector and subset of desks they trade with
NETTING_SET_CONFIG = {}
for i, ns in enumerate(NETTING_SETS):
# Each netting set references a credit model and a subset of desks
credit_model = ["CR_IG", "CR_HY", "CR_EM"][i % 3]
# Each netting set sees 5 of the 15 desks (rotating)
desk_indices = [(i * 3 + j) % 15 for j in range(5)]
NETTING_SET_CONFIG[ns] = {
"credit_model": credit_model,
"desk_indices": desk_indices,
"seed": 7000 + i,
}
# ---------------------------------------------------------------------------
# Level 1: Model Calibration (15 parallel nodes)
# ---------------------------------------------------------------------------
def _calibrate_model(name: str, seed: int, param_ranges: dict,
corr: float, config: ComputeConfig) -> ModelParams:
"""Generic calibration: iterative least-squares fitting.
Heavy compute: n_calibration_iterations × (n_paths/10 × n_market_instruments).
Output: ~6 floats.
"""
rng = np.random.default_rng(seed)
n_instruments = 60 + seed % 40 # 60-100 instruments
market_data = 0.01 + 0.005 * rng.standard_normal(n_instruments)
best_params = None
best_error = float("inf")
for _ in range(config.n_calibration_iterations):
trial_mr = rng.uniform(*param_ranges["mr"])
trial_vol = rng.uniform(*param_ranges["vol"])
trial_mean = rng.uniform(*param_ranges["mean"])
dt = 1.0 / 12
paths = rng.standard_normal((config.n_paths // 10, n_instruments))
model_vols = trial_vol * np.sqrt(
(1 - np.exp(-2 * trial_mr * dt)) / (2 * max(trial_mr, 1e-6))
)
model_prices = trial_mean + model_vols * paths
implied_vols = np.std(model_prices, axis=0)
error = np.mean((implied_vols - np.abs(market_data)) ** 2)
if error < best_error:
best_error = error
best_params = (trial_mr, trial_vol, trial_mean)
return ModelParams(
name=name,
mean_reversion=best_params[0],
volatility=best_params[1],
long_term_mean=best_params[2],
correlation_to_rates=corr,
calibration_error=best_error,
)
# Generate one calibration function per model (15 functions).
# Each is a standalone callable that pargraph can dispatch independently.
def _make_calibrator(name, seed, ranges, corr):
"""Factory: return a function(config) -> ModelParams for one model."""
def calibrate(config: ComputeConfig) -> ModelParams:
return _calibrate_model(name, seed, ranges, corr, config)
calibrate.__name__ = f"calibrate_{name.lower()}"
calibrate.__qualname__ = calibrate.__name__
return calibrate
CALIBRATORS = {}
for _name, _seed, _ranges, _corr in MODELS:
CALIBRATORS[_name] = _make_calibrator(_name, _seed, _ranges, _corr)
# ---------------------------------------------------------------------------
# Level 2: Monte Carlo Simulation (15 parallel nodes)
# ---------------------------------------------------------------------------
def _simulate_paths(params: ModelParams, ir_ref_params: ModelParams,
config: ComputeConfig, seed: int) -> SimulationResult:
"""Generic path simulation using calibrated parameters.
Heavy compute: n_paths × n_timesteps evolution with chunked memory.
Output: 3 vectors of length n_timesteps + 4 floats.
"""
from scipy.stats import kurtosis, skew
rng = np.random.default_rng(seed)
dt = 1.0 / 12
n = config.n_timesteps
kappa = params.mean_reversion
sigma = params.volatility
theta = params.long_term_mean
rho = params.correlation_to_rates
chunk_size = min(config.n_paths, 10_000)
n_chunks = max(config.n_paths // chunk_size, 1)
running_sum = np.zeros(n)
running_sum_sq = np.zeros(n)
terminal_values = []
is_equity = params.name.startswith("EQ_")
is_fx = params.name.startswith("FX_")
for _ in range(n_chunks):
if is_equity:
# Heston-style stochastic vol
log_s = np.zeros((chunk_size, n))
variance = np.full(chunk_size, theta)
xi = sigma # vol of vol
v_bar = theta
for t in range(1, n):
z1 = rng.standard_normal(chunk_size)
z2 = rng.standard_normal(chunk_size)
z_vol = -0.7 * z1 + np.sqrt(1 - 0.49) * z2
variance = np.maximum(
variance + kappa * (v_bar - variance) * dt
+ xi * np.sqrt(np.maximum(variance, 0) * dt) * z_vol,
1e-8,
)
r = ir_ref_params.long_term_mean
log_s[:, t] = (log_s[:, t - 1]
+ (r - 0.5 * variance) * dt
+ np.sqrt(variance * dt) * z1)
values = np.exp(log_s) * 100
elif is_fx:
# GBM with stochastic drift from IR differential
log_fx = np.zeros((chunk_size, n))
log_fx[:, 0] = np.log(max(theta, 0.01))
for t in range(1, n):
z1 = rng.standard_normal(chunk_size)
z2 = rng.standard_normal(chunk_size)
z_corr = rho * z1 + np.sqrt(max(1 - rho**2, 0)) * z2
ir_diff = ir_ref_params.long_term_mean * 0.1 * np.sin(2 * np.pi * t / 12)
log_fx[:, t] = (log_fx[:, t - 1]
+ (ir_diff - 0.5 * sigma**2) * dt
+ sigma * np.sqrt(dt) * z_corr)
values = np.exp(log_fx)
else:
# Mean-reverting (IR or Credit)
rates = np.zeros((chunk_size, n))
rates[:, 0] = theta
for t in range(1, n):
dW = rng.standard_normal(chunk_size) * np.sqrt(dt)
rates[:, t] = (rates[:, t - 1]
+ kappa * (theta - rates[:, t - 1]) * dt
+ sigma * dW)
values = rates
running_sum += np.sum(values, axis=0)
running_sum_sq += np.sum(values**2, axis=0)
terminal_values.extend(values[:, -1].tolist())
total = chunk_size * n_chunks
mean_path = running_sum / total
std_path = np.sqrt(np.maximum(running_sum_sq / total - mean_path**2, 0))
terminal = np.array(terminal_values)
return SimulationResult(
asset_class=params.name,
n_paths=config.n_paths,
n_timesteps=n,
mean_path=mean_path,
percentile_97_5=mean_path + 1.96 * std_path,
percentile_2_5=mean_path - 1.96 * std_path,
terminal_distribution_moments=(
float(np.mean(terminal)),
float(np.std(terminal)),
float(skew(terminal)),
float(kurtosis(terminal)),
),
)
def _make_simulator(model_name: str, seed: int, needs_ir_ref: bool):
"""Factory: return a simulation function for one model.
IR_USD is the reference curve; other models may depend on it.
"""
if model_name == "IR_USD":
# IR_USD doesn't depend on another IR ref
def simulate(params: ModelParams, config: ComputeConfig) -> SimulationResult:
return _simulate_paths(params, params, config, seed)
elif needs_ir_ref:
def simulate(params: ModelParams, ir_usd_params: ModelParams,
config: ComputeConfig) -> SimulationResult:
return _simulate_paths(params, ir_usd_params, config, seed)
else:
# Credit models don't need IR ref for simulation
def simulate(params: ModelParams, config: ComputeConfig) -> SimulationResult:
return _simulate_paths(params, ModelParams("dummy", 0.1, 0.01, 0.03, 0, 0),
config, seed)
simulate.__name__ = f"simulate_{model_name.lower()}"
simulate.__qualname__ = simulate.__name__
return simulate
SIMULATORS = {}
for _name, _seed, _, _ in MODELS:
_needs_ir = _name.startswith("FX_") or _name.startswith("EQ_") or (
_name.startswith("IR_") and _name != "IR_USD"
)
SIMULATORS[_name] = _make_simulator(_name, 1000 + _seed, _needs_ir)
# ---------------------------------------------------------------------------
# Level 3: Portfolio Pricing (15 parallel nodes, one per desk)
# ---------------------------------------------------------------------------
def _price_desk_portfolio(sim: SimulationResult, ir_ref_sim: SimulationResult,
config: ComputeConfig, seed: int) -> ExposureProfile:
"""Price a desk's portfolio under simulated scenarios.
Heavy compute: n_trades × n_scenarios present value calculations per timestep.
"""
rng = np.random.default_rng(seed)
n = sim.n_timesteps
notionals = rng.uniform(1e6, 50e6, config.n_trades)
strikes = sim.mean_path[0] * rng.uniform(0.9, 1.1, config.n_trades)
maturities = rng.integers(max(12, n // 4), n, config.n_trades)
is_option = rng.random(config.n_trades) > 0.5
ee = np.zeros(n)
epe = np.zeros(n)
ene = np.zeros(n)
for t in range(n):
alive = maturities > t
if not np.any(alive):
continue
remaining = (maturities[alive] - t) / 12.0
spot = sim.mean_path[t]
spread = max((sim.percentile_97_5[t] - sim.percentile_2_5[t]) / 2, 1e-10)
r = ir_ref_sim.mean_path[min(t, len(ir_ref_sim.mean_path) - 1)]
n_scenarios = max(config.n_paths // 100, 10)
scenario_mtms = np.zeros(n_scenarios)
for s in range(n_scenarios):
scenario_factor = spot + rng.standard_normal() * spread
mtm = notionals[alive] * remaining * (scenario_factor - strikes[alive])
opt_mask = is_option[alive]
mtm[opt_mask] = np.maximum(mtm[opt_mask], 0)
scenario_mtms[s] = np.sum(mtm * np.exp(-max(r, 0) * remaining))
ee[t] = np.mean(scenario_mtms)
epe[t] = np.mean(np.maximum(scenario_mtms, 0))
ene[t] = np.mean(np.minimum(scenario_mtms, 0))
return ExposureProfile(
asset_class=sim.asset_class,
n_trades=config.n_trades,
expected_exposure=ee,
expected_positive_exposure=epe,
expected_negative_exposure=ene,
peak_exposure=float(np.max(np.abs(epe))),
)
def _make_pricer(model_name: str, seed: int):
"""Factory: return a pricing function for one desk."""
if model_name == "IR_USD":
def price(sim: SimulationResult, config: ComputeConfig) -> ExposureProfile:
return _price_desk_portfolio(sim, sim, config, seed)
else:
def price(sim: SimulationResult, ir_usd_sim: SimulationResult,
config: ComputeConfig) -> ExposureProfile:
return _price_desk_portfolio(sim, ir_usd_sim, config, seed)
price.__name__ = f"price_{model_name.lower()}"
price.__qualname__ = price.__name__
return price
PRICERS = {}
for _name, _seed, _, _ in MODELS:
PRICERS[_name] = _make_pricer(_name, 4000 + _seed)
# ---------------------------------------------------------------------------
# Level 4: XVA per Netting Set (15 parallel nodes)
# ---------------------------------------------------------------------------
def _compute_netting_set_xva(
exposure_profiles: List[ExposureProfile],
credit_params: ModelParams,
config: ComputeConfig,
seed: int,
ns_name: str,
) -> XVAResult:
"""Compute CVA + FVA for a single netting set.
Aggregates exposure from the desks this counterparty trades with,
then runs Monte Carlo on credit intensity and funding cost.
"""
rng = np.random.default_rng(seed)
n = len(exposure_profiles[0].expected_positive_exposure)
dt = 1.0 / 12
lgd = 0.6
funding_spread = 0.005
# Aggregate EPE/EE across desks in this netting set
total_epe = np.zeros(n)
total_ee = np.zeros(n)
for ep in exposure_profiles:
total_epe += ep.expected_positive_exposure
total_ee += ep.expected_exposure
intensity = credit_params.long_term_mean
vol = credit_params.volatility
kappa = credit_params.mean_reversion
n_mc = config.n_paths
chunk_size = min(n_mc, 10_000)
n_chunks = max(n_mc // chunk_size, 1)
cva_samples = np.zeros(n_chunks)
fva_samples = np.zeros(n_chunks)
cva_ts = np.zeros(n)
fva_ts = np.zeros(n)
for c in range(n_chunks):
lam = np.full(chunk_size, max(intensity, 1e-6))
spread = np.full(chunk_size, funding_spread)
survival = np.ones(chunk_size)
chunk_cva = np.zeros(chunk_size)
chunk_fva = np.zeros(chunk_size)
for t in range(n):
dW = rng.standard_normal(chunk_size) * np.sqrt(dt)
dW2 = rng.standard_normal(chunk_size) * np.sqrt(dt)
lam = np.maximum(
lam + kappa * (intensity - lam) * dt
+ vol * np.sqrt(np.maximum(lam, 0)) * dW,
1e-6,
)
spread = np.maximum(
spread + 0.1 * (funding_spread - spread) * dt + 0.001 * dW2,
0,
)
default_prob = 1 - np.exp(-lam * dt)
discount = np.exp(-0.03 * t * dt)
inc_cva = discount * lgd * total_epe[t] * default_prob * survival
inc_fva = discount * spread * total_ee[t] * dt
chunk_cva += inc_cva
chunk_fva += inc_fva
cva_ts[t] += np.sum(inc_cva)
fva_ts[t] += np.sum(inc_fva)
survival *= 1 - default_prob
cva_samples[c] = np.mean(chunk_cva)
fva_samples[c] = np.mean(chunk_fva)
denom = n_chunks * chunk_size
cva_ts /= denom
fva_ts /= denom
return XVAResult(
netting_set=ns_name,
cva=float(np.mean(cva_samples)),
fva=float(np.mean(fva_samples)),
term_structure_cva=cva_ts,
term_structure_fva=fva_ts,
)
def _make_xva_calculator(ns_name: str, ns_cfg: dict, n_desks: int):
"""Factory: return a function that computes XVA for one netting set.
The function signature lists the exact exposure profiles + credit params
it needs so pargraph can wire the DAG edges.
"""
desk_indices = ns_cfg["desk_indices"]
seed = ns_cfg["seed"]
# Build a function that takes (exp_0, exp_1, ..., exp_4, credit_params, config)
def compute_xva(*args) -> XVAResult:
# Last two args are credit_params and config
config = args[-1]
credit_params = args[-2]
exposures = list(args[:-2])
return _compute_netting_set_xva(exposures, credit_params, config, seed, ns_name)
compute_xva.__name__ = f"xva_{ns_name.lower()}"
compute_xva.__qualname__ = compute_xva.__name__
return compute_xva
XVA_CALCULATORS = {}
for _ns, _cfg in NETTING_SET_CONFIG.items():
XVA_CALCULATORS[_ns] = _make_xva_calculator(_ns, _cfg, len(DESK_NAMES))
# ---------------------------------------------------------------------------
# Level 5: Sensitivities per Netting Set (15 parallel nodes)
# ---------------------------------------------------------------------------
def _compute_netting_set_sensitivities(
xva: XVAResult,
exposure_profiles: List[ExposureProfile],
credit_params: ModelParams,
config: ComputeConfig,
seed: int,
ns_name: str,
) -> SensitivityResult:
"""Bump-and-reprice sensitivities for one netting set.
Re-runs XVA with bumped inputs for IR, credit, FX, equity.
"""
bump = config.bump_size
base_cva = xva.cva
def _bumped_exposures(factor):
return [
ExposureProfile(
asset_class=ep.asset_class,
n_trades=ep.n_trades,
expected_exposure=ep.expected_exposure * factor,
expected_positive_exposure=ep.expected_positive_exposure * factor,
expected_negative_exposure=ep.expected_negative_exposure * factor,
peak_exposure=ep.peak_exposure * factor,
)
for ep in exposure_profiles
]
# Delta IR: bump all exposures
xva_up = _compute_netting_set_xva(
_bumped_exposures(1 + bump * 1000), credit_params, config, seed + 100, ns_name
)
delta_ir = (xva_up.cva - base_cva) / bump
# Delta Credit: bump intensity
cr_bumped = ModelParams(
name=credit_params.name,
mean_reversion=credit_params.mean_reversion,
volatility=credit_params.volatility,
long_term_mean=credit_params.long_term_mean * (1 + bump * 100),
correlation_to_rates=credit_params.correlation_to_rates,
calibration_error=credit_params.calibration_error,
)
xva_cr = _compute_netting_set_xva(
exposure_profiles, cr_bumped, config, seed + 200, ns_name
)
delta_credit = (xva_cr.cva - base_cva) / bump
# Delta FX
xva_fx = _compute_netting_set_xva(
_bumped_exposures(1 + bump * 100), credit_params, config, seed + 300, ns_name
)
delta_fx = (xva_fx.cva - base_cva) / bump
# Delta Equity
xva_eq = _compute_netting_set_xva(
_bumped_exposures(1 + bump * 50), credit_params, config, seed + 400, ns_name
)
delta_eq = (xva_eq.cva - base_cva) / bump
# Gamma IR
xva_down = _compute_netting_set_xva(
_bumped_exposures(1 - bump * 1000), credit_params, config, seed + 500, ns_name
)
gamma_ir = (xva_up.cva - 2 * base_cva + xva_down.cva) / (bump**2)
# Vega
cr_vega = ModelParams(
name=credit_params.name,
mean_reversion=credit_params.mean_reversion,
volatility=credit_params.volatility * (1 + bump * 100),
long_term_mean=credit_params.long_term_mean,
correlation_to_rates=credit_params.correlation_to_rates,
calibration_error=credit_params.calibration_error,
)
xva_vega = _compute_netting_set_xva(
exposure_profiles, cr_vega, config, seed + 600, ns_name
)
vega = (xva_vega.cva - base_cva) / bump
return SensitivityResult(
netting_set=ns_name,
delta_ir=delta_ir,
delta_credit=delta_credit,
delta_fx=delta_fx,
delta_equity=delta_eq,
gamma_ir=gamma_ir,
vega=vega,
)
def _make_sensitivity_calculator(ns_name: str, ns_cfg: dict):
"""Factory: return a function that computes sensitivities for one netting set."""
desk_indices = ns_cfg["desk_indices"]
seed = ns_cfg["seed"]
def compute_sens(*args) -> SensitivityResult:
# args: xva_result, exp_0..exp_4, credit_params, config
config = args[-1]
credit_params = args[-2]
xva = args[0]
exposures = list(args[1:-2])
return _compute_netting_set_sensitivities(
xva, exposures, credit_params, config, seed, ns_name
)
compute_sens.__name__ = f"sens_{ns_name.lower()}"
compute_sens.__qualname__ = compute_sens.__name__
return compute_sens
SENSITIVITY_CALCULATORS = {}
for _ns, _cfg in NETTING_SET_CONFIG.items():
SENSITIVITY_CALCULATORS[_ns] = _make_sensitivity_calculator(_ns, _cfg)
# ---------------------------------------------------------------------------
# Level 6: Report Aggregation
# ---------------------------------------------------------------------------
def aggregate_xva_report(*args) -> XVARiskReport:
"""Aggregate all netting set XVA results + sensitivities into final report.
args: xva_1, ..., xva_15, sens_1, ..., sens_15
"""
n_ns = len(NETTING_SETS)
xva_results = args[:n_ns]
sens_results = args[n_ns:2 * n_ns]
total_cva = sum(x.cva for x in xva_results)
total_fva = sum(x.fva for x in xva_results)
breakdown = {}
for xva in xva_results:
breakdown[xva.netting_set] = {"cva": xva.cva, "fva": xva.fva}
# Aggregate sensitivities
agg_cva_sens = {"delta_ir": 0, "delta_credit": 0, "delta_fx": 0,
"delta_equity": 0, "gamma_ir": 0, "vega": 0}
agg_fva_sens = dict(agg_cva_sens)
for s in sens_results:
agg_cva_sens["delta_ir"] += s.delta_ir
agg_cva_sens["delta_credit"] += s.delta_credit
agg_cva_sens["delta_fx"] += s.delta_fx
agg_cva_sens["delta_equity"] += s.delta_equity
agg_cva_sens["gamma_ir"] += s.gamma_ir
agg_cva_sens["vega"] += s.vega
return XVARiskReport(
total_cva=total_cva,
total_fva=total_fva,
total_xva=total_cva + total_fva,
netting_set_breakdown=breakdown,
cva_sensitivities=agg_cva_sens,
fva_sensitivities=agg_fva_sens,
)
# ---------------------------------------------------------------------------
# Pretty printing
# ---------------------------------------------------------------------------
def print_report(report: XVARiskReport) -> None:
print("\n" + "=" * 60)
print(" XVA RISK REPORT")
print("=" * 60)
print(f" CVA: {report.total_cva:>16,.2f}")
print(f" FVA: {report.total_fva:>16,.2f}")
print(f" Total XVA: {report.total_xva:>16,.2f}")
print("-" * 60)
print(" Netting Set Breakdown:")
for ns, vals in report.netting_set_breakdown.items():
print(f" {ns:>15s}: CVA={vals['cva']:>12,.2f} FVA={vals['fva']:>12,.2f}")
print("-" * 60)
print(" CVA Sensitivities:")
for k, v in report.cva_sensitivities.items():
print(f" {k:>15s}: {v:>16,.2f}")
print(" FVA Sensitivities:")
for k, v in report.fva_sensitivities.items():
print(f" {k:>15s}: {v:>16,.2f}")
print("-" * 60)
print("=" * 60)
def run_sequential(config: ComputeConfig) -> XVARiskReport:
"""Run the full pipeline sequentially."""
params = {}
for name in DESK_NAMES:
params[name] = CALIBRATORS[name](config)
sims = {}
for name in DESK_NAMES:
if name == "IR_USD":
sims[name] = SIMULATORS[name](params[name], config)
elif name.startswith("CR_"):
sims[name] = SIMULATORS[name](params[name], config)
else:
sims[name] = SIMULATORS[name](params[name], params["IR_USD"], config)
exposures = {}
for name in DESK_NAMES:
if name == "IR_USD":
exposures[name] = PRICERS[name](sims[name], config)
else:
exposures[name] = PRICERS[name](sims[name], sims["IR_USD"], config)
xva_results = {}
for ns_name, ns_cfg in NETTING_SET_CONFIG.items():
desk_exps = [exposures[DESK_NAMES[i]] for i in ns_cfg["desk_indices"]]
credit_p = params[ns_cfg["credit_model"]]
xva_results[ns_name] = _compute_netting_set_xva(
desk_exps, credit_p, config, ns_cfg["seed"], ns_name
)
sens_results = {}
for ns_name, ns_cfg in NETTING_SET_CONFIG.items():
desk_exps = [exposures[DESK_NAMES[i]] for i in ns_cfg["desk_indices"]]
credit_p = params[ns_cfg["credit_model"]]
sens_results[ns_name] = _compute_netting_set_sensitivities(
xva_results[ns_name], desk_exps, credit_p, config, ns_cfg["seed"], ns_name
)
report_args = (
[xva_results[ns] for ns in NETTING_SETS]
+ [sens_results[ns] for ns in NETTING_SETS]
)
report = aggregate_xva_report(*report_args)
print_report(report)
return report
run_sequential(MEDIUM_CONFIG)
Pargraph Implementation¶
[ ]:
import os
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["GOTO_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
from dataclasses import dataclass, field
from typing import Dict, List, Tuple
import numpy as np
from pargraph import GraphEngine
from scaler import Client, SchedulerClusterCombo
# ---------------------------------------------------------------------------
# Data classes — small, serializable results passed between DAG nodes
# ---------------------------------------------------------------------------
@dataclass
class ModelParams:
"""Calibrated model parameters. Tiny payload (few floats)."""
name: str
mean_reversion: float
volatility: float
long_term_mean: float
correlation_to_rates: float
calibration_error: float
@dataclass
class SimulationResult:
"""Summary statistics from Monte Carlo simulation. Small payload."""
asset_class: str
n_paths: int
n_timesteps: int
mean_path: np.ndarray # shape (n_timesteps,)
percentile_97_5: np.ndarray
percentile_2_5: np.ndarray
terminal_distribution_moments: Tuple[float, float, float, float]
@dataclass
class ExposureProfile:
"""Portfolio exposure profile for one desk/asset class. Small payload."""
asset_class: str
n_trades: int
expected_exposure: np.ndarray # shape (n_timesteps,)
expected_positive_exposure: np.ndarray
expected_negative_exposure: np.ndarray
peak_exposure: float
@dataclass
class XVAResult:
"""XVA metrics for a single netting set."""
netting_set: str
cva: float
fva: float
term_structure_cva: np.ndarray
term_structure_fva: np.ndarray
@dataclass
class SensitivityResult:
"""Greeks/sensitivities for one netting set."""
netting_set: str
delta_ir: float
delta_credit: float
delta_fx: float
delta_equity: float
gamma_ir: float
vega: float
@dataclass
class XVARiskReport:
"""Final aggregated report. Small payload."""
total_cva: float
total_fva: float
total_xva: float
netting_set_breakdown: Dict[str, Dict[str, float]]
cva_sensitivities: Dict[str, float]
fva_sensitivities: Dict[str, float]
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
@dataclass
class ComputeConfig:
"""Controls how heavy each computation step is."""
n_paths: int
n_timesteps: int
n_trades: int # trades per desk
n_calibration_iterations: int
bump_size: float = 0.0001
TEST_CONFIG = ComputeConfig(
n_paths=10_000,
n_timesteps=60,
n_trades=100,
n_calibration_iterations=5_000,
)
MEDIUM_CONFIG = ComputeConfig(
n_paths=100_000,
n_timesteps=180,
n_trades=300,
n_calibration_iterations=15_000,
)
FULL_CONFIG = ComputeConfig(
n_paths=500_000,
n_timesteps=360,
n_trades=2_000,
n_calibration_iterations=50_000,
)
# ---------------------------------------------------------------------------
# Asset class definitions — 15 models to calibrate
# ---------------------------------------------------------------------------
# (name, seed, param_ranges, correlation_to_rates)
MODELS = [
# 5 IR curves (different currencies)
("IR_USD", 100, {"mr": (0.01, 0.5), "vol": (0.005, 0.02), "mean": (0.02, 0.05)}, 1.0),
("IR_EUR", 101, {"mr": (0.01, 0.5), "vol": (0.003, 0.015), "mean": (0.01, 0.04)}, 0.9),
("IR_GBP", 102, {"mr": (0.01, 0.5), "vol": (0.004, 0.018), "mean": (0.015, 0.045)}, 0.85),
("IR_JPY", 103, {"mr": (0.01, 0.5), "vol": (0.001, 0.008), "mean": (-0.005, 0.015)}, 0.7),
("IR_CHF", 104, {"mr": (0.01, 0.5), "vol": (0.002, 0.012), "mean": (-0.003, 0.02)}, 0.75),
# 4 FX pairs
("FX_EURUSD", 200, {"mr": (0.0, 0.3), "vol": (0.05, 0.15), "mean": (1.0, 1.2)}, 0.3),
("FX_GBPUSD", 201, {"mr": (0.0, 0.3), "vol": (0.06, 0.18), "mean": (1.2, 1.4)}, 0.25),
("FX_USDJPY", 202, {"mr": (0.0, 0.3), "vol": (0.04, 0.12), "mean": (100, 150)}, 0.2),
("FX_USDCHF", 203, {"mr": (0.0, 0.3), "vol": (0.05, 0.14), "mean": (0.85, 1.0)}, 0.22),
# 3 equity indices
("EQ_SPX", 300, {"mr": (0.5, 5.0), "vol": (0.1, 0.5), "mean": (0.02, 0.08)}, -0.2),
("EQ_EUROSTOXX", 301, {"mr": (0.5, 5.0), "vol": (0.12, 0.6), "mean": (0.02, 0.10)}, -0.25),
("EQ_NIKKEI", 302, {"mr": (0.5, 5.0), "vol": (0.10, 0.45), "mean": (0.01, 0.07)}, -0.15),
# 3 credit sectors
("CR_IG", 400, {"mr": (0.01, 1.0), "vol": (0.005, 0.03), "mean": (0.003, 0.015)}, -0.15),
("CR_HY", 401, {"mr": (0.01, 1.0), "vol": (0.01, 0.06), "mean": (0.01, 0.05)}, -0.25),
("CR_EM", 402, {"mr": (0.01, 1.0), "vol": (0.008, 0.05), "mean": (0.008, 0.04)}, -0.20),
]
# 15 desks for pricing (one per model)
DESK_NAMES = [m[0] for m in MODELS]
# 15 netting sets for XVA
NETTING_SETS = [
"NS_USBank1", "NS_USBank2", "NS_USBank3",
"NS_EUBank1", "NS_EUBank2", "NS_EUBank3",
"NS_UKBank1", "NS_UKBank2",
"NS_JPBank1", "NS_JPBank2",
"NS_Hedge1", "NS_Hedge2", "NS_Hedge3",
"NS_Corp1", "NS_Corp2",
]
# Map netting sets to credit sector and subset of desks they trade with
NETTING_SET_CONFIG = {}
for i, ns in enumerate(NETTING_SETS):
# Each netting set references a credit model and a subset of desks
credit_model = ["CR_IG", "CR_HY", "CR_EM"][i % 3]
# Each netting set sees 5 of the 15 desks (rotating)
desk_indices = [(i * 3 + j) % 15 for j in range(5)]
NETTING_SET_CONFIG[ns] = {
"credit_model": credit_model,
"desk_indices": desk_indices,
"seed": 7000 + i,
}
# ---------------------------------------------------------------------------
# Level 1: Model Calibration (15 parallel nodes)
# ---------------------------------------------------------------------------
def _calibrate_model(name: str, seed: int, param_ranges: dict,
corr: float, config: ComputeConfig) -> ModelParams:
"""Generic calibration: iterative least-squares fitting.
Heavy compute: n_calibration_iterations × (n_paths/10 × n_market_instruments).
Output: ~6 floats.
"""
rng = np.random.default_rng(seed)
n_instruments = 60 + seed % 40 # 60-100 instruments
market_data = 0.01 + 0.005 * rng.standard_normal(n_instruments)
best_params = None
best_error = float("inf")
for _ in range(config.n_calibration_iterations):
trial_mr = rng.uniform(*param_ranges["mr"])
trial_vol = rng.uniform(*param_ranges["vol"])
trial_mean = rng.uniform(*param_ranges["mean"])
dt = 1.0 / 12
paths = rng.standard_normal((config.n_paths // 10, n_instruments))
model_vols = trial_vol * np.sqrt(
(1 - np.exp(-2 * trial_mr * dt)) / (2 * max(trial_mr, 1e-6))
)
model_prices = trial_mean + model_vols * paths
implied_vols = np.std(model_prices, axis=0)
error = np.mean((implied_vols - np.abs(market_data)) ** 2)
if error < best_error:
best_error = error
best_params = (trial_mr, trial_vol, trial_mean)
return ModelParams(
name=name,
mean_reversion=best_params[0],
volatility=best_params[1],
long_term_mean=best_params[2],
correlation_to_rates=corr,
calibration_error=best_error,
)
# Generate one calibration function per model (15 functions).
# Each is a standalone callable that pargraph can dispatch independently.
def _make_calibrator(name, seed, ranges, corr):
"""Factory: return a function(config) -> ModelParams for one model."""
def calibrate(config: ComputeConfig) -> ModelParams:
return _calibrate_model(name, seed, ranges, corr, config)
calibrate.__name__ = f"calibrate_{name.lower()}"
calibrate.__qualname__ = calibrate.__name__
return calibrate
CALIBRATORS = {}
for _name, _seed, _ranges, _corr in MODELS:
CALIBRATORS[_name] = _make_calibrator(_name, _seed, _ranges, _corr)
# ---------------------------------------------------------------------------
# Level 2: Monte Carlo Simulation (15 parallel nodes)
# ---------------------------------------------------------------------------
def _simulate_paths(params: ModelParams, ir_ref_params: ModelParams,
config: ComputeConfig, seed: int) -> SimulationResult:
"""Generic path simulation using calibrated parameters.
Heavy compute: n_paths × n_timesteps evolution with chunked memory.
Output: 3 vectors of length n_timesteps + 4 floats.
"""
from scipy.stats import kurtosis, skew
rng = np.random.default_rng(seed)
dt = 1.0 / 12
n = config.n_timesteps
kappa = params.mean_reversion
sigma = params.volatility
theta = params.long_term_mean
rho = params.correlation_to_rates
chunk_size = min(config.n_paths, 10_000)
n_chunks = max(config.n_paths // chunk_size, 1)
running_sum = np.zeros(n)
running_sum_sq = np.zeros(n)
terminal_values = []
is_equity = params.name.startswith("EQ_")
is_fx = params.name.startswith("FX_")
for _ in range(n_chunks):
if is_equity:
# Heston-style stochastic vol
log_s = np.zeros((chunk_size, n))
variance = np.full(chunk_size, theta)
xi = sigma # vol of vol
v_bar = theta
for t in range(1, n):
z1 = rng.standard_normal(chunk_size)
z2 = rng.standard_normal(chunk_size)
z_vol = -0.7 * z1 + np.sqrt(1 - 0.49) * z2
variance = np.maximum(
variance + kappa * (v_bar - variance) * dt
+ xi * np.sqrt(np.maximum(variance, 0) * dt) * z_vol,
1e-8,
)
r = ir_ref_params.long_term_mean
log_s[:, t] = (log_s[:, t - 1]
+ (r - 0.5 * variance) * dt
+ np.sqrt(variance * dt) * z1)
values = np.exp(log_s) * 100
elif is_fx:
# GBM with stochastic drift from IR differential
log_fx = np.zeros((chunk_size, n))
log_fx[:, 0] = np.log(max(theta, 0.01))
for t in range(1, n):
z1 = rng.standard_normal(chunk_size)
z2 = rng.standard_normal(chunk_size)
z_corr = rho * z1 + np.sqrt(max(1 - rho**2, 0)) * z2
ir_diff = ir_ref_params.long_term_mean * 0.1 * np.sin(2 * np.pi * t / 12)
log_fx[:, t] = (log_fx[:, t - 1]
+ (ir_diff - 0.5 * sigma**2) * dt
+ sigma * np.sqrt(dt) * z_corr)
values = np.exp(log_fx)
else:
# Mean-reverting (IR or Credit)
rates = np.zeros((chunk_size, n))
rates[:, 0] = theta
for t in range(1, n):
dW = rng.standard_normal(chunk_size) * np.sqrt(dt)
rates[:, t] = (rates[:, t - 1]
+ kappa * (theta - rates[:, t - 1]) * dt
+ sigma * dW)
values = rates
running_sum += np.sum(values, axis=0)
running_sum_sq += np.sum(values**2, axis=0)
terminal_values.extend(values[:, -1].tolist())
total = chunk_size * n_chunks
mean_path = running_sum / total
std_path = np.sqrt(np.maximum(running_sum_sq / total - mean_path**2, 0))
terminal = np.array(terminal_values)
return SimulationResult(
asset_class=params.name,
n_paths=config.n_paths,
n_timesteps=n,
mean_path=mean_path,
percentile_97_5=mean_path + 1.96 * std_path,
percentile_2_5=mean_path - 1.96 * std_path,
terminal_distribution_moments=(
float(np.mean(terminal)),
float(np.std(terminal)),
float(skew(terminal)),
float(kurtosis(terminal)),
),
)
def _make_simulator(model_name: str, seed: int, needs_ir_ref: bool):
"""Factory: return a simulation function for one model.
IR_USD is the reference curve; other models may depend on it.
"""
if model_name == "IR_USD":
# IR_USD doesn't depend on another IR ref
def simulate(params: ModelParams, config: ComputeConfig) -> SimulationResult:
return _simulate_paths(params, params, config, seed)
elif needs_ir_ref:
def simulate(params: ModelParams, ir_usd_params: ModelParams,
config: ComputeConfig) -> SimulationResult:
return _simulate_paths(params, ir_usd_params, config, seed)
else:
# Credit models don't need IR ref for simulation
def simulate(params: ModelParams, config: ComputeConfig) -> SimulationResult:
return _simulate_paths(params, ModelParams("dummy", 0.1, 0.01, 0.03, 0, 0),
config, seed)
simulate.__name__ = f"simulate_{model_name.lower()}"
simulate.__qualname__ = simulate.__name__
return simulate
SIMULATORS = {}
for _name, _seed, _, _ in MODELS:
_needs_ir = _name.startswith("FX_") or _name.startswith("EQ_") or (
_name.startswith("IR_") and _name != "IR_USD"
)
SIMULATORS[_name] = _make_simulator(_name, 1000 + _seed, _needs_ir)
# ---------------------------------------------------------------------------
# Level 3: Portfolio Pricing (15 parallel nodes, one per desk)
# ---------------------------------------------------------------------------
def _price_desk_portfolio(sim: SimulationResult, ir_ref_sim: SimulationResult,
config: ComputeConfig, seed: int) -> ExposureProfile:
"""Price a desk's portfolio under simulated scenarios.
Heavy compute: n_trades × n_scenarios present value calculations per timestep.
"""
rng = np.random.default_rng(seed)
n = sim.n_timesteps
notionals = rng.uniform(1e6, 50e6, config.n_trades)
strikes = sim.mean_path[0] * rng.uniform(0.9, 1.1, config.n_trades)
maturities = rng.integers(max(12, n // 4), n, config.n_trades)
is_option = rng.random(config.n_trades) > 0.5
ee = np.zeros(n)
epe = np.zeros(n)
ene = np.zeros(n)
for t in range(n):
alive = maturities > t
if not np.any(alive):
continue
remaining = (maturities[alive] - t) / 12.0
spot = sim.mean_path[t]
spread = max((sim.percentile_97_5[t] - sim.percentile_2_5[t]) / 2, 1e-10)
r = ir_ref_sim.mean_path[min(t, len(ir_ref_sim.mean_path) - 1)]
n_scenarios = max(config.n_paths // 100, 10)
scenario_mtms = np.zeros(n_scenarios)
for s in range(n_scenarios):
scenario_factor = spot + rng.standard_normal() * spread
mtm = notionals[alive] * remaining * (scenario_factor - strikes[alive])
opt_mask = is_option[alive]
mtm[opt_mask] = np.maximum(mtm[opt_mask], 0)
scenario_mtms[s] = np.sum(mtm * np.exp(-max(r, 0) * remaining))
ee[t] = np.mean(scenario_mtms)
epe[t] = np.mean(np.maximum(scenario_mtms, 0))
ene[t] = np.mean(np.minimum(scenario_mtms, 0))
return ExposureProfile(
asset_class=sim.asset_class,
n_trades=config.n_trades,
expected_exposure=ee,
expected_positive_exposure=epe,
expected_negative_exposure=ene,
peak_exposure=float(np.max(np.abs(epe))),
)
def _make_pricer(model_name: str, seed: int):
"""Factory: return a pricing function for one desk."""
if model_name == "IR_USD":
def price(sim: SimulationResult, config: ComputeConfig) -> ExposureProfile:
return _price_desk_portfolio(sim, sim, config, seed)
else:
def price(sim: SimulationResult, ir_usd_sim: SimulationResult,
config: ComputeConfig) -> ExposureProfile:
return _price_desk_portfolio(sim, ir_usd_sim, config, seed)
price.__name__ = f"price_{model_name.lower()}"
price.__qualname__ = price.__name__
return price
PRICERS = {}
for _name, _seed, _, _ in MODELS:
PRICERS[_name] = _make_pricer(_name, 4000 + _seed)
# ---------------------------------------------------------------------------
# Level 4: XVA per Netting Set (15 parallel nodes)
# ---------------------------------------------------------------------------
def _compute_netting_set_xva(
exposure_profiles: List[ExposureProfile],
credit_params: ModelParams,
config: ComputeConfig,
seed: int,
ns_name: str,
) -> XVAResult:
"""Compute CVA + FVA for a single netting set.
Aggregates exposure from the desks this counterparty trades with,
then runs Monte Carlo on credit intensity and funding cost.
"""
rng = np.random.default_rng(seed)
n = len(exposure_profiles[0].expected_positive_exposure)
dt = 1.0 / 12
lgd = 0.6
funding_spread = 0.005
# Aggregate EPE/EE across desks in this netting set
total_epe = np.zeros(n)
total_ee = np.zeros(n)
for ep in exposure_profiles:
total_epe += ep.expected_positive_exposure
total_ee += ep.expected_exposure
intensity = credit_params.long_term_mean
vol = credit_params.volatility
kappa = credit_params.mean_reversion
n_mc = config.n_paths
chunk_size = min(n_mc, 10_000)
n_chunks = max(n_mc // chunk_size, 1)
cva_samples = np.zeros(n_chunks)
fva_samples = np.zeros(n_chunks)
cva_ts = np.zeros(n)
fva_ts = np.zeros(n)
for c in range(n_chunks):
lam = np.full(chunk_size, max(intensity, 1e-6))
spread = np.full(chunk_size, funding_spread)
survival = np.ones(chunk_size)
chunk_cva = np.zeros(chunk_size)
chunk_fva = np.zeros(chunk_size)
for t in range(n):
dW = rng.standard_normal(chunk_size) * np.sqrt(dt)
dW2 = rng.standard_normal(chunk_size) * np.sqrt(dt)
lam = np.maximum(
lam + kappa * (intensity - lam) * dt
+ vol * np.sqrt(np.maximum(lam, 0)) * dW,
1e-6,
)
spread = np.maximum(
spread + 0.1 * (funding_spread - spread) * dt + 0.001 * dW2,
0,
)
default_prob = 1 - np.exp(-lam * dt)
discount = np.exp(-0.03 * t * dt)
inc_cva = discount * lgd * total_epe[t] * default_prob * survival
inc_fva = discount * spread * total_ee[t] * dt
chunk_cva += inc_cva
chunk_fva += inc_fva
cva_ts[t] += np.sum(inc_cva)
fva_ts[t] += np.sum(inc_fva)
survival *= 1 - default_prob
cva_samples[c] = np.mean(chunk_cva)
fva_samples[c] = np.mean(chunk_fva)
denom = n_chunks * chunk_size
cva_ts /= denom
fva_ts /= denom
return XVAResult(
netting_set=ns_name,
cva=float(np.mean(cva_samples)),
fva=float(np.mean(fva_samples)),
term_structure_cva=cva_ts,
term_structure_fva=fva_ts,
)
def _make_xva_calculator(ns_name: str, ns_cfg: dict, n_desks: int):
"""Factory: return a function that computes XVA for one netting set.
The function signature lists the exact exposure profiles + credit params
it needs so pargraph can wire the DAG edges.
"""
desk_indices = ns_cfg["desk_indices"]
seed = ns_cfg["seed"]
# Build a function that takes (exp_0, exp_1, ..., exp_4, credit_params, config)
def compute_xva(*args) -> XVAResult:
# Last two args are credit_params and config
config = args[-1]
credit_params = args[-2]
exposures = list(args[:-2])
return _compute_netting_set_xva(exposures, credit_params, config, seed, ns_name)
compute_xva.__name__ = f"xva_{ns_name.lower()}"
compute_xva.__qualname__ = compute_xva.__name__
return compute_xva
XVA_CALCULATORS = {}
for _ns, _cfg in NETTING_SET_CONFIG.items():
XVA_CALCULATORS[_ns] = _make_xva_calculator(_ns, _cfg, len(DESK_NAMES))
# ---------------------------------------------------------------------------
# Level 5: Sensitivities per Netting Set (15 parallel nodes)
# ---------------------------------------------------------------------------
def _compute_netting_set_sensitivities(
xva: XVAResult,
exposure_profiles: List[ExposureProfile],
credit_params: ModelParams,
config: ComputeConfig,
seed: int,
ns_name: str,
) -> SensitivityResult:
"""Bump-and-reprice sensitivities for one netting set.
Re-runs XVA with bumped inputs for IR, credit, FX, equity.
"""
bump = config.bump_size
base_cva = xva.cva
def _bumped_exposures(factor):
return [
ExposureProfile(
asset_class=ep.asset_class,
n_trades=ep.n_trades,
expected_exposure=ep.expected_exposure * factor,
expected_positive_exposure=ep.expected_positive_exposure * factor,
expected_negative_exposure=ep.expected_negative_exposure * factor,
peak_exposure=ep.peak_exposure * factor,
)
for ep in exposure_profiles
]
# Delta IR: bump all exposures
xva_up = _compute_netting_set_xva(
_bumped_exposures(1 + bump * 1000), credit_params, config, seed + 100, ns_name
)
delta_ir = (xva_up.cva - base_cva) / bump
# Delta Credit: bump intensity
cr_bumped = ModelParams(
name=credit_params.name,
mean_reversion=credit_params.mean_reversion,
volatility=credit_params.volatility,
long_term_mean=credit_params.long_term_mean * (1 + bump * 100),
correlation_to_rates=credit_params.correlation_to_rates,
calibration_error=credit_params.calibration_error,
)
xva_cr = _compute_netting_set_xva(
exposure_profiles, cr_bumped, config, seed + 200, ns_name
)
delta_credit = (xva_cr.cva - base_cva) / bump
# Delta FX
xva_fx = _compute_netting_set_xva(
_bumped_exposures(1 + bump * 100), credit_params, config, seed + 300, ns_name
)
delta_fx = (xva_fx.cva - base_cva) / bump
# Delta Equity
xva_eq = _compute_netting_set_xva(
_bumped_exposures(1 + bump * 50), credit_params, config, seed + 400, ns_name
)
delta_eq = (xva_eq.cva - base_cva) / bump
# Gamma IR
xva_down = _compute_netting_set_xva(
_bumped_exposures(1 - bump * 1000), credit_params, config, seed + 500, ns_name
)
gamma_ir = (xva_up.cva - 2 * base_cva + xva_down.cva) / (bump**2)
# Vega
cr_vega = ModelParams(
name=credit_params.name,
mean_reversion=credit_params.mean_reversion,
volatility=credit_params.volatility * (1 + bump * 100),
long_term_mean=credit_params.long_term_mean,
correlation_to_rates=credit_params.correlation_to_rates,
calibration_error=credit_params.calibration_error,
)
xva_vega = _compute_netting_set_xva(
exposure_profiles, cr_vega, config, seed + 600, ns_name
)
vega = (xva_vega.cva - base_cva) / bump
return SensitivityResult(
netting_set=ns_name,
delta_ir=delta_ir,
delta_credit=delta_credit,
delta_fx=delta_fx,
delta_equity=delta_eq,
gamma_ir=gamma_ir,
vega=vega,
)
def _make_sensitivity_calculator(ns_name: str, ns_cfg: dict):
"""Factory: return a function that computes sensitivities for one netting set."""
desk_indices = ns_cfg["desk_indices"]
seed = ns_cfg["seed"]
def compute_sens(*args) -> SensitivityResult:
# args: xva_result, exp_0..exp_4, credit_params, config
config = args[-1]
credit_params = args[-2]
xva = args[0]
exposures = list(args[1:-2])
return _compute_netting_set_sensitivities(
xva, exposures, credit_params, config, seed, ns_name
)
compute_sens.__name__ = f"sens_{ns_name.lower()}"
compute_sens.__qualname__ = compute_sens.__name__
return compute_sens
SENSITIVITY_CALCULATORS = {}
for _ns, _cfg in NETTING_SET_CONFIG.items():
SENSITIVITY_CALCULATORS[_ns] = _make_sensitivity_calculator(_ns, _cfg)
# ---------------------------------------------------------------------------
# Level 6: Report Aggregation
# ---------------------------------------------------------------------------
def aggregate_xva_report(*args) -> XVARiskReport:
"""Aggregate all netting set XVA results + sensitivities into final report.
args: xva_1, ..., xva_15, sens_1, ..., sens_15
"""
n_ns = len(NETTING_SETS)
xva_results = args[:n_ns]
sens_results = args[n_ns:2 * n_ns]
total_cva = sum(x.cva for x in xva_results)
total_fva = sum(x.fva for x in xva_results)
breakdown = {}
for xva in xva_results:
breakdown[xva.netting_set] = {"cva": xva.cva, "fva": xva.fva}
# Aggregate sensitivities
agg_cva_sens = {"delta_ir": 0, "delta_credit": 0, "delta_fx": 0,
"delta_equity": 0, "gamma_ir": 0, "vega": 0}
agg_fva_sens = dict(agg_cva_sens)
for s in sens_results:
agg_cva_sens["delta_ir"] += s.delta_ir
agg_cva_sens["delta_credit"] += s.delta_credit
agg_cva_sens["delta_fx"] += s.delta_fx
agg_cva_sens["delta_equity"] += s.delta_equity
agg_cva_sens["gamma_ir"] += s.gamma_ir
agg_cva_sens["vega"] += s.vega
return XVARiskReport(
total_cva=total_cva,
total_fva=total_fva,
total_xva=total_cva + total_fva,
netting_set_breakdown=breakdown,
cva_sensitivities=agg_cva_sens,
fva_sensitivities=agg_fva_sens,
)
# ---------------------------------------------------------------------------
# Pretty printing
# ---------------------------------------------------------------------------
def print_report(report: XVARiskReport) -> None:
print("\n" + "=" * 60)
print(" XVA RISK REPORT")
print("=" * 60)
print(f" CVA: {report.total_cva:>16,.2f}")
print(f" FVA: {report.total_fva:>16,.2f}")
print(f" Total XVA: {report.total_xva:>16,.2f}")
print("-" * 60)
print(" Netting Set Breakdown:")
for ns, vals in report.netting_set_breakdown.items():
print(f" {ns:>15s}: CVA={vals['cva']:>12,.2f} FVA={vals['fva']:>12,.2f}")
print("-" * 60)
print(" CVA Sensitivities:")
for k, v in report.cva_sensitivities.items():
print(f" {k:>15s}: {v:>16,.2f}")
print(" FVA Sensitivities:")
for k, v in report.fva_sensitivities.items():
print(f" {k:>15s}: {v:>16,.2f}")
print("-" * 60)
print("=" * 60)
def build_xva_graph(config: ComputeConfig) -> dict:
graph = {
"config": config,
}
# Level 1: Calibrations
for name in DESK_NAMES:
graph[f"params_{name}"] = (CALIBRATORS[name], "config")
# Level 2: Simulations
for name in DESK_NAMES:
if name == "IR_USD":
graph[f"sim_{name}"] = (SIMULATORS[name], f"params_{name}", "config")
elif name.startswith("CR_"):
graph[f"sim_{name}"] = (SIMULATORS[name], f"params_{name}", "config")
else:
graph[f"sim_{name}"] = (
SIMULATORS[name], f"params_{name}", "params_IR_USD", "config"
)
# Level 3: Pricing
for name in DESK_NAMES:
if name == "IR_USD":
graph[f"exp_{name}"] = (PRICERS[name], f"sim_{name}", "config")
else:
graph[f"exp_{name}"] = (
PRICERS[name], f"sim_{name}", "sim_IR_USD", "config"
)
# Level 4: XVA per netting set
for ns_name, ns_cfg in NETTING_SET_CONFIG.items():
deps = [f"exp_{DESK_NAMES[i]}" for i in ns_cfg["desk_indices"]]
deps.append(f"params_{ns_cfg['credit_model']}")
deps.append("config")
graph[f"xva_{ns_name}"] = tuple([XVA_CALCULATORS[ns_name]] + deps)
# Level 5: Sensitivities per netting set
for ns_name, ns_cfg in NETTING_SET_CONFIG.items():
deps = [f"xva_{ns_name}"]
deps += [f"exp_{DESK_NAMES[i]}" for i in ns_cfg["desk_indices"]]
deps.append(f"params_{ns_cfg['credit_model']}")
deps.append("config")
graph[f"sens_{ns_name}"] = tuple([SENSITIVITY_CALCULATORS[ns_name]] + deps)
# Level 6: Aggregation
agg_deps = [f"xva_{ns}" for ns in NETTING_SETS]
agg_deps += [f"sens_{ns}" for ns in NETTING_SETS]
graph["xva_report"] = tuple([aggregate_xva_report] + agg_deps)
return graph
def run_graph(config: ComputeConfig, n_workers: int = 4) -> XVARiskReport:
"""Run the XVA pipeline as a pargraph DAG with Scaler workers."""
graph = build_xva_graph(config)
cluster = SchedulerClusterCombo(
n_workers=n_workers,
logging_level="WARNING",
)
try:
client = Client(address=cluster.get_address())
engine = GraphEngine(backend=client)
report = engine.get(graph, keys="xva_report")
client.disconnect()
print_report(report)
return report
finally:
cluster.shutdown()
run_graph(MEDIUM_CONFIG, n_workers=16)
Diff: Native vs Pargraph¶
[1]:
import difflib
import json
import re
from IPython.display import HTML, display
with open("XVA.ipynb") as _f:
_nb = json.load(_f)
native_code = "".join(_nb["cells"][3]["source"])
parallel_code = "".join(_nb["cells"][5]["source"])
differ = difflib.HtmlDiff(wrapcolumn=90)
table = differ.make_table(
native_code.splitlines(),
parallel_code.splitlines(),
fromdesc="Native Python",
todesc="With Pargraph",
context=True,
numlines=10,
)
# Strip unwanted columns: navigation links, line numbers, and nowrap attribute
table = re.sub(r'<td class="diff_next"[^>]*>.*?</td>', "", table)
table = re.sub(r'<th class="diff_next"[^>]*>.*?</th>', "", table)
table = re.sub(r'<td class="diff_header"[^>]*>.*?</td>', "", table)
table = table.replace('colspan="2"', "")
table = table.replace(' nowrap="nowrap"', "")
table = re.sub(
r"(\s*<colgroup></colgroup>){6}",
"\n <colgroup></colgroup> <colgroup></colgroup>",
table,
)
css = """<style>
table.diff { font-family: monospace; font-size: 10px; border-collapse: collapse; width: 100%; }
table.diff td { padding: 2px 8px; white-space: pre-wrap; word-break: break-all; text-align: left !important; }
table.diff th { padding: 6px 8px; text-align: center; background-color: #f0f0f0; font-size: 14px; }
td.diff_add { background-color: #e6ffec; }
td.diff_chg { background-color: #fff8c5; }
td.diff_sub { background-color: #ffebe9; }
span.diff_add { background-color: #ccffd8; }
span.diff_chg { background-color: #fff2a8; }
span.diff_sub { background-color: #ffc0c0; }
</style>"""
display(HTML(css + table))
| Native Python | With Pargraph |
|---|---|
| os.environ["OPENBLAS_NUM_THREADS"] = "1" | os.environ["OPENBLAS_NUM_THREADS"] = "1" |
| os.environ["GOTO_NUM_THREADS"] = "1" | os.environ["GOTO_NUM_THREADS"] = "1" |
| os.environ["OMP_NUM_THREADS"] = "1" | os.environ["OMP_NUM_THREADS"] = "1" |
| os.environ["MKL_NUM_THREADS"] = "1" | os.environ["MKL_NUM_THREADS"] = "1" |
| from dataclasses import dataclass, field | from dataclasses import dataclass, field |
| from typing import Dict, List, Tuple | from typing import Dict, List, Tuple |
| import numpy as np | import numpy as np |
| from pargraph import GraphEngine | |
| from scaler import Client, SchedulerClusterCombo | |
| # --------------------------------------------------------------------------- | # --------------------------------------------------------------------------- |
| # Data classes — small, serializable results passed between DAG nodes | # Data classes — small, serializable results passed between DAG nodes |
| # --------------------------------------------------------------------------- | # --------------------------------------------------------------------------- |
| @dataclass | @dataclass |
| class ModelParams: | class ModelParams: |
| """Calibrated model parameters. Tiny payload (few floats).""" | """Calibrated model parameters. Tiny payload (few floats).""" |
| print(f" {k:>15s}: {v:>16,.2f}") | print(f" {k:>15s}: {v:>16,.2f}") |
| print(" FVA Sensitivities:") | print(" FVA Sensitivities:") |
| for k, v in report.fva_sensitivities.items(): | for k, v in report.fva_sensitivities.items(): |
| print(f" {k:>15s}: {v:>16,.2f}") | print(f" {k:>15s}: {v:>16,.2f}") |
| print("-" * 60) | print("-" * 60) |
| print("=" * 60) | print("=" * 60) |
| def run_sequential(config: ComputeConfig) -> XVARiskReport: | def build_xva_graph(config: ComputeConfig) -> dict: |
| """Run the full pipeline sequentially.""" | graph = { |
| params = {} | "config": config, |
| } | |
| # Level 1: Calibrations | |
| for name in DESK_NAMES: | for name in DESK_NAMES: |
| params[name] = CALIBRATORS[name](config) | graph[f"params_{name}"] = (CALIBRATORS[name], "config") |
| sims = {} | # Level 2: Simulations |
| for name in DESK_NAMES: | for name in DESK_NAMES: |
| if name == "IR_USD": | if name == "IR_USD": |
| sims[name] = SIMULATORS[name](params[name], config) | graph[f"sim_{name}"] = (SIMULATORS[name], f"params_{name}", "config") |
| elif name.startswith("CR_"): | elif name.startswith("CR_"): |
| sims[name] = SIMULATORS[name](params[name], config) | graph[f"sim_{name}"] = (SIMULATORS[name], f"params_{name}", "config") |
| else: | else: |
| graph[f"sim_{name}"] = ( | |
| sims[name] = SIMULATORS[name](params[name], params["IR_USD"], config) | SIMULATORS[name], f"params_{name}", "params_IR_USD", "config" |
| ) | |
| exposures = {} | # Level 3: Pricing |
| for name in DESK_NAMES: | for name in DESK_NAMES: |
| if name == "IR_USD": | if name == "IR_USD": |
| exposures[name] = PRICERS[name](sims[name], config) | graph[f"exp_{name}"] = (PRICERS[name], f"sim_{name}", "config") |
| else: | else: |
| exposures[name] = PRICERS[name](sims[name], sims["IR_USD"], config) | graph[f"exp_{name}"] = ( |
| PRICERS[name], f"sim_{name}", "sim_IR_USD", "config" | |
| ) | |
| xva_results = {} | # Level 4: XVA per netting set |
| for ns_name, ns_cfg in NETTING_SET_CONFIG.items(): | for ns_name, ns_cfg in NETTING_SET_CONFIG.items(): |
| desk_exps = [exposures[DESK_NAMES[i]] for i in ns_cfg["desk_indices"]] | deps = [f"exp_{DESK_NAMES[i]}" for i in ns_cfg["desk_indices"]] |
| credit_p = params[ns_cfg["credit_model"]] | deps.append(f"params_{ns_cfg['credit_model']}") |
| xva_results[ns_name] = _compute_netting_set_xva( | deps.append("config") |
| desk_exps, credit_p, config, ns_cfg["seed"], ns_name | graph[f"xva_{ns_name}"] = tuple([XVA_CALCULATORS[ns_name]] + deps) |
| ) | |
| sens_results = {} | # Level 5: Sensitivities per netting set |
| for ns_name, ns_cfg in NETTING_SET_CONFIG.items(): | for ns_name, ns_cfg in NETTING_SET_CONFIG.items(): |
| deps = [f"xva_{ns_name}"] | |
| desk_exps = [exposures[DESK_NAMES[i]] for i in ns_cfg["desk_indices"]] | deps += [f"exp_{DESK_NAMES[i]}" for i in ns_cfg["desk_indices"]] |
| credit_p = params[ns_cfg["credit_model"]] | deps.append(f"params_{ns_cfg['credit_model']}") |
| sens_results[ns_name] = _compute_netting_set_sensitivities( | deps.append("config") |
| xva_results[ns_name], desk_exps, credit_p, config, ns_cfg["seed"], ns_name | graph[f"sens_{ns_name}"] = tuple([SENSITIVITY_CALCULATORS[ns_name]] + deps) |
| ) | |
| report_args = ( | # Level 6: Aggregation |
| [xva_results[ns] for ns in NETTING_SETS] | agg_deps = [f"xva_{ns}" for ns in NETTING_SETS] |
| + [sens_results[ns] for ns in NETTING_SETS] | agg_deps += [f"sens_{ns}" for ns in NETTING_SETS] |
| ) | graph["xva_report"] = tuple([aggregate_xva_report] + agg_deps) |
| report = aggregate_xva_report(*report_args) | |
| print_report(report) | |
| return report | |
| run_sequential(MEDIUM_CONFIG) | return graph |
| def run_graph(config: ComputeConfig, n_workers: int = 4) -> XVARiskReport: | |
| """Run the XVA pipeline as a pargraph DAG with Scaler workers.""" | |
| graph = build_xva_graph(config) | |
| cluster = SchedulerClusterCombo( | |
| n_workers=n_workers, | |
| logging_level="WARNING", | |
| ) | |
| try: | |
| client = Client(address=cluster.get_address()) | |
| engine = GraphEngine(backend=client) | |
| report = engine.get(graph, keys="xva_report") | |
| client.disconnect() | |
| print_report(report) | |
| return report | |
| finally: | |
| cluster.shutdown() | |
| run_graph(MEDIUM_CONFIG, n_workers=16) |
Statistics¶
Parfun |
Pargraph |
Sequential Runtime |
Parallel Runtime |
Workers |
Speedup |
Efficiency |
|---|---|---|---|---|---|---|
No |
Yes |
3843.8s |
374.1s |
16 |
10.27x |
0.64 |