Portfolio-Level XVA Risk Computation with Pargraph

A bank’s XVA desk computes Credit Valuation Adjustment (CVA) and Funding Valuation Adjustment (FVA) across a multi-asset portfolio spanning interest rates, FX, equities, and credit. The CVA for a netting set is:

\[\text{CVA} = \text{LGD} \sum_{t} D(t)\, \mathbb{E}[\max(V_t, 0)]\, \Delta\text{PD}(t)\]

The pipeline forms a 6-level DAG with ~80 compute nodes:

  • Level 1 — Calibrate stochastic models for 15 asset sub-classes (IR×5, FX×4, EQ×3, Credit×3)

  • Level 2 — Monte Carlo path simulation for each calibrated model (15 parallel)

  • Level 3 — Portfolio pricing per desk/asset class (15 parallel)

  • Level 4 — XVA (CVA+FVA) per netting set / counterparty (15 parallel)

  • Level 5 — Sensitivity/Greeks per netting set via bump-and-reprice (15 parallel)

  • Level 6 — Aggregate final XVA risk report (1 node)

Pargraph schedules the pipeline as a DAG: at each level, all 15 independent nodes execute in parallel across Scaler workers. The critical path is only 6 levels deep regardless of the number of models, desks, or netting sets.

Program Structure

Program Structure

Native Python Implementation

[ ]:
import os

os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["GOTO_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"

from dataclasses import dataclass, field
from typing import Dict, List, Tuple

import numpy as np

# ---------------------------------------------------------------------------
# Data classes — small, serializable results passed between DAG nodes
# ---------------------------------------------------------------------------


@dataclass
class ModelParams:
    """Calibrated model parameters. Tiny payload (few floats)."""

    name: str
    mean_reversion: float
    volatility: float
    long_term_mean: float
    correlation_to_rates: float
    calibration_error: float


@dataclass
class SimulationResult:
    """Summary statistics from Monte Carlo simulation. Small payload."""

    asset_class: str
    n_paths: int
    n_timesteps: int
    mean_path: np.ndarray  # shape (n_timesteps,)
    percentile_97_5: np.ndarray
    percentile_2_5: np.ndarray
    terminal_distribution_moments: Tuple[float, float, float, float]


@dataclass
class ExposureProfile:
    """Portfolio exposure profile for one desk/asset class. Small payload."""

    asset_class: str
    n_trades: int
    expected_exposure: np.ndarray  # shape (n_timesteps,)
    expected_positive_exposure: np.ndarray
    expected_negative_exposure: np.ndarray
    peak_exposure: float


@dataclass
class XVAResult:
    """XVA metrics for a single netting set."""

    netting_set: str
    cva: float
    fva: float
    term_structure_cva: np.ndarray
    term_structure_fva: np.ndarray


@dataclass
class SensitivityResult:
    """Greeks/sensitivities for one netting set."""

    netting_set: str
    delta_ir: float
    delta_credit: float
    delta_fx: float
    delta_equity: float
    gamma_ir: float
    vega: float


@dataclass
class XVARiskReport:
    """Final aggregated report. Small payload."""

    total_cva: float
    total_fva: float
    total_xva: float
    netting_set_breakdown: Dict[str, Dict[str, float]]
    cva_sensitivities: Dict[str, float]
    fva_sensitivities: Dict[str, float]


# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------


@dataclass
class ComputeConfig:
    """Controls how heavy each computation step is."""

    n_paths: int
    n_timesteps: int
    n_trades: int  # trades per desk
    n_calibration_iterations: int
    bump_size: float = 0.0001


TEST_CONFIG = ComputeConfig(
    n_paths=10_000,
    n_timesteps=60,
    n_trades=100,
    n_calibration_iterations=5_000,
)

MEDIUM_CONFIG = ComputeConfig(
    n_paths=100_000,
    n_timesteps=180,
    n_trades=300,
    n_calibration_iterations=15_000,
)

FULL_CONFIG = ComputeConfig(
    n_paths=500_000,
    n_timesteps=360,
    n_trades=2_000,
    n_calibration_iterations=50_000,
)


# ---------------------------------------------------------------------------
# Asset class definitions — 15 models to calibrate
# ---------------------------------------------------------------------------

# (name, seed, param_ranges, correlation_to_rates)
MODELS = [
    # 5 IR curves (different currencies)
    ("IR_USD", 100, {"mr": (0.01, 0.5), "vol": (0.005, 0.02), "mean": (0.02, 0.05)}, 1.0),
    ("IR_EUR", 101, {"mr": (0.01, 0.5), "vol": (0.003, 0.015), "mean": (0.01, 0.04)}, 0.9),
    ("IR_GBP", 102, {"mr": (0.01, 0.5), "vol": (0.004, 0.018), "mean": (0.015, 0.045)}, 0.85),
    ("IR_JPY", 103, {"mr": (0.01, 0.5), "vol": (0.001, 0.008), "mean": (-0.005, 0.015)}, 0.7),
    ("IR_CHF", 104, {"mr": (0.01, 0.5), "vol": (0.002, 0.012), "mean": (-0.003, 0.02)}, 0.75),
    # 4 FX pairs
    ("FX_EURUSD", 200, {"mr": (0.0, 0.3), "vol": (0.05, 0.15), "mean": (1.0, 1.2)}, 0.3),
    ("FX_GBPUSD", 201, {"mr": (0.0, 0.3), "vol": (0.06, 0.18), "mean": (1.2, 1.4)}, 0.25),
    ("FX_USDJPY", 202, {"mr": (0.0, 0.3), "vol": (0.04, 0.12), "mean": (100, 150)}, 0.2),
    ("FX_USDCHF", 203, {"mr": (0.0, 0.3), "vol": (0.05, 0.14), "mean": (0.85, 1.0)}, 0.22),
    # 3 equity indices
    ("EQ_SPX", 300, {"mr": (0.5, 5.0), "vol": (0.1, 0.5), "mean": (0.02, 0.08)}, -0.2),
    ("EQ_EUROSTOXX", 301, {"mr": (0.5, 5.0), "vol": (0.12, 0.6), "mean": (0.02, 0.10)}, -0.25),
    ("EQ_NIKKEI", 302, {"mr": (0.5, 5.0), "vol": (0.10, 0.45), "mean": (0.01, 0.07)}, -0.15),
    # 3 credit sectors
    ("CR_IG", 400, {"mr": (0.01, 1.0), "vol": (0.005, 0.03), "mean": (0.003, 0.015)}, -0.15),
    ("CR_HY", 401, {"mr": (0.01, 1.0), "vol": (0.01, 0.06), "mean": (0.01, 0.05)}, -0.25),
    ("CR_EM", 402, {"mr": (0.01, 1.0), "vol": (0.008, 0.05), "mean": (0.008, 0.04)}, -0.20),
]

# 15 desks for pricing (one per model)
DESK_NAMES = [m[0] for m in MODELS]

# 15 netting sets for XVA
NETTING_SETS = [
    "NS_USBank1", "NS_USBank2", "NS_USBank3",
    "NS_EUBank1", "NS_EUBank2", "NS_EUBank3",
    "NS_UKBank1", "NS_UKBank2",
    "NS_JPBank1", "NS_JPBank2",
    "NS_Hedge1", "NS_Hedge2", "NS_Hedge3",
    "NS_Corp1", "NS_Corp2",
]

# Map netting sets to credit sector and subset of desks they trade with
NETTING_SET_CONFIG = {}
for i, ns in enumerate(NETTING_SETS):
    # Each netting set references a credit model and a subset of desks
    credit_model = ["CR_IG", "CR_HY", "CR_EM"][i % 3]
    # Each netting set sees 5 of the 15 desks (rotating)
    desk_indices = [(i * 3 + j) % 15 for j in range(5)]
    NETTING_SET_CONFIG[ns] = {
        "credit_model": credit_model,
        "desk_indices": desk_indices,
        "seed": 7000 + i,
    }


# ---------------------------------------------------------------------------
# Level 1: Model Calibration (15 parallel nodes)
# ---------------------------------------------------------------------------


def _calibrate_model(name: str, seed: int, param_ranges: dict,
                     corr: float, config: ComputeConfig) -> ModelParams:
    """Generic calibration: iterative least-squares fitting.

    Heavy compute: n_calibration_iterations × (n_paths/10 × n_market_instruments).
    Output: ~6 floats.
    """
    rng = np.random.default_rng(seed)
    n_instruments = 60 + seed % 40  # 60-100 instruments

    market_data = 0.01 + 0.005 * rng.standard_normal(n_instruments)

    best_params = None
    best_error = float("inf")

    for _ in range(config.n_calibration_iterations):
        trial_mr = rng.uniform(*param_ranges["mr"])
        trial_vol = rng.uniform(*param_ranges["vol"])
        trial_mean = rng.uniform(*param_ranges["mean"])

        dt = 1.0 / 12
        paths = rng.standard_normal((config.n_paths // 10, n_instruments))
        model_vols = trial_vol * np.sqrt(
            (1 - np.exp(-2 * trial_mr * dt)) / (2 * max(trial_mr, 1e-6))
        )
        model_prices = trial_mean + model_vols * paths
        implied_vols = np.std(model_prices, axis=0)

        error = np.mean((implied_vols - np.abs(market_data)) ** 2)
        if error < best_error:
            best_error = error
            best_params = (trial_mr, trial_vol, trial_mean)

    return ModelParams(
        name=name,
        mean_reversion=best_params[0],
        volatility=best_params[1],
        long_term_mean=best_params[2],
        correlation_to_rates=corr,
        calibration_error=best_error,
    )


# Generate one calibration function per model (15 functions).
# Each is a standalone callable that pargraph can dispatch independently.

def _make_calibrator(name, seed, ranges, corr):
    """Factory: return a function(config) -> ModelParams for one model."""
    def calibrate(config: ComputeConfig) -> ModelParams:
        return _calibrate_model(name, seed, ranges, corr, config)
    calibrate.__name__ = f"calibrate_{name.lower()}"
    calibrate.__qualname__ = calibrate.__name__
    return calibrate


CALIBRATORS = {}
for _name, _seed, _ranges, _corr in MODELS:
    CALIBRATORS[_name] = _make_calibrator(_name, _seed, _ranges, _corr)


# ---------------------------------------------------------------------------
# Level 2: Monte Carlo Simulation (15 parallel nodes)
# ---------------------------------------------------------------------------


def _simulate_paths(params: ModelParams, ir_ref_params: ModelParams,
                    config: ComputeConfig, seed: int) -> SimulationResult:
    """Generic path simulation using calibrated parameters.

    Heavy compute: n_paths × n_timesteps evolution with chunked memory.
    Output: 3 vectors of length n_timesteps + 4 floats.
    """
    from scipy.stats import kurtosis, skew

    rng = np.random.default_rng(seed)
    dt = 1.0 / 12
    n = config.n_timesteps
    kappa = params.mean_reversion
    sigma = params.volatility
    theta = params.long_term_mean
    rho = params.correlation_to_rates

    chunk_size = min(config.n_paths, 10_000)
    n_chunks = max(config.n_paths // chunk_size, 1)

    running_sum = np.zeros(n)
    running_sum_sq = np.zeros(n)
    terminal_values = []

    is_equity = params.name.startswith("EQ_")
    is_fx = params.name.startswith("FX_")

    for _ in range(n_chunks):
        if is_equity:
            # Heston-style stochastic vol
            log_s = np.zeros((chunk_size, n))
            variance = np.full(chunk_size, theta)
            xi = sigma  # vol of vol
            v_bar = theta
            for t in range(1, n):
                z1 = rng.standard_normal(chunk_size)
                z2 = rng.standard_normal(chunk_size)
                z_vol = -0.7 * z1 + np.sqrt(1 - 0.49) * z2
                variance = np.maximum(
                    variance + kappa * (v_bar - variance) * dt
                    + xi * np.sqrt(np.maximum(variance, 0) * dt) * z_vol,
                    1e-8,
                )
                r = ir_ref_params.long_term_mean
                log_s[:, t] = (log_s[:, t - 1]
                               + (r - 0.5 * variance) * dt
                               + np.sqrt(variance * dt) * z1)
            values = np.exp(log_s) * 100
        elif is_fx:
            # GBM with stochastic drift from IR differential
            log_fx = np.zeros((chunk_size, n))
            log_fx[:, 0] = np.log(max(theta, 0.01))
            for t in range(1, n):
                z1 = rng.standard_normal(chunk_size)
                z2 = rng.standard_normal(chunk_size)
                z_corr = rho * z1 + np.sqrt(max(1 - rho**2, 0)) * z2
                ir_diff = ir_ref_params.long_term_mean * 0.1 * np.sin(2 * np.pi * t / 12)
                log_fx[:, t] = (log_fx[:, t - 1]
                                + (ir_diff - 0.5 * sigma**2) * dt
                                + sigma * np.sqrt(dt) * z_corr)
            values = np.exp(log_fx)
        else:
            # Mean-reverting (IR or Credit)
            rates = np.zeros((chunk_size, n))
            rates[:, 0] = theta
            for t in range(1, n):
                dW = rng.standard_normal(chunk_size) * np.sqrt(dt)
                rates[:, t] = (rates[:, t - 1]
                               + kappa * (theta - rates[:, t - 1]) * dt
                               + sigma * dW)
            values = rates

        running_sum += np.sum(values, axis=0)
        running_sum_sq += np.sum(values**2, axis=0)
        terminal_values.extend(values[:, -1].tolist())

    total = chunk_size * n_chunks
    mean_path = running_sum / total
    std_path = np.sqrt(np.maximum(running_sum_sq / total - mean_path**2, 0))

    terminal = np.array(terminal_values)
    return SimulationResult(
        asset_class=params.name,
        n_paths=config.n_paths,
        n_timesteps=n,
        mean_path=mean_path,
        percentile_97_5=mean_path + 1.96 * std_path,
        percentile_2_5=mean_path - 1.96 * std_path,
        terminal_distribution_moments=(
            float(np.mean(terminal)),
            float(np.std(terminal)),
            float(skew(terminal)),
            float(kurtosis(terminal)),
        ),
    )


def _make_simulator(model_name: str, seed: int, needs_ir_ref: bool):
    """Factory: return a simulation function for one model.

    IR_USD is the reference curve; other models may depend on it.
    """
    if model_name == "IR_USD":
        # IR_USD doesn't depend on another IR ref
        def simulate(params: ModelParams, config: ComputeConfig) -> SimulationResult:
            return _simulate_paths(params, params, config, seed)
    elif needs_ir_ref:
        def simulate(params: ModelParams, ir_usd_params: ModelParams,
                     config: ComputeConfig) -> SimulationResult:
            return _simulate_paths(params, ir_usd_params, config, seed)
    else:
        # Credit models don't need IR ref for simulation
        def simulate(params: ModelParams, config: ComputeConfig) -> SimulationResult:
            return _simulate_paths(params, ModelParams("dummy", 0.1, 0.01, 0.03, 0, 0),
                                   config, seed)
    simulate.__name__ = f"simulate_{model_name.lower()}"
    simulate.__qualname__ = simulate.__name__
    return simulate


SIMULATORS = {}
for _name, _seed, _, _ in MODELS:
    _needs_ir = _name.startswith("FX_") or _name.startswith("EQ_") or (
        _name.startswith("IR_") and _name != "IR_USD"
    )
    SIMULATORS[_name] = _make_simulator(_name, 1000 + _seed, _needs_ir)


# ---------------------------------------------------------------------------
# Level 3: Portfolio Pricing (15 parallel nodes, one per desk)
# ---------------------------------------------------------------------------


def _price_desk_portfolio(sim: SimulationResult, ir_ref_sim: SimulationResult,
                          config: ComputeConfig, seed: int) -> ExposureProfile:
    """Price a desk's portfolio under simulated scenarios.

    Heavy compute: n_trades × n_scenarios present value calculations per timestep.
    """
    rng = np.random.default_rng(seed)
    n = sim.n_timesteps

    notionals = rng.uniform(1e6, 50e6, config.n_trades)
    strikes = sim.mean_path[0] * rng.uniform(0.9, 1.1, config.n_trades)
    maturities = rng.integers(max(12, n // 4), n, config.n_trades)
    is_option = rng.random(config.n_trades) > 0.5

    ee = np.zeros(n)
    epe = np.zeros(n)
    ene = np.zeros(n)

    for t in range(n):
        alive = maturities > t
        if not np.any(alive):
            continue
        remaining = (maturities[alive] - t) / 12.0
        spot = sim.mean_path[t]
        spread = max((sim.percentile_97_5[t] - sim.percentile_2_5[t]) / 2, 1e-10)
        r = ir_ref_sim.mean_path[min(t, len(ir_ref_sim.mean_path) - 1)]

        n_scenarios = max(config.n_paths // 100, 10)
        scenario_mtms = np.zeros(n_scenarios)

        for s in range(n_scenarios):
            scenario_factor = spot + rng.standard_normal() * spread
            mtm = notionals[alive] * remaining * (scenario_factor - strikes[alive])
            opt_mask = is_option[alive]
            mtm[opt_mask] = np.maximum(mtm[opt_mask], 0)
            scenario_mtms[s] = np.sum(mtm * np.exp(-max(r, 0) * remaining))

        ee[t] = np.mean(scenario_mtms)
        epe[t] = np.mean(np.maximum(scenario_mtms, 0))
        ene[t] = np.mean(np.minimum(scenario_mtms, 0))

    return ExposureProfile(
        asset_class=sim.asset_class,
        n_trades=config.n_trades,
        expected_exposure=ee,
        expected_positive_exposure=epe,
        expected_negative_exposure=ene,
        peak_exposure=float(np.max(np.abs(epe))),
    )


def _make_pricer(model_name: str, seed: int):
    """Factory: return a pricing function for one desk."""
    if model_name == "IR_USD":
        def price(sim: SimulationResult, config: ComputeConfig) -> ExposureProfile:
            return _price_desk_portfolio(sim, sim, config, seed)
    else:
        def price(sim: SimulationResult, ir_usd_sim: SimulationResult,
                  config: ComputeConfig) -> ExposureProfile:
            return _price_desk_portfolio(sim, ir_usd_sim, config, seed)
    price.__name__ = f"price_{model_name.lower()}"
    price.__qualname__ = price.__name__
    return price


PRICERS = {}
for _name, _seed, _, _ in MODELS:
    PRICERS[_name] = _make_pricer(_name, 4000 + _seed)


# ---------------------------------------------------------------------------
# Level 4: XVA per Netting Set (15 parallel nodes)
# ---------------------------------------------------------------------------


def _compute_netting_set_xva(
    exposure_profiles: List[ExposureProfile],
    credit_params: ModelParams,
    config: ComputeConfig,
    seed: int,
    ns_name: str,
) -> XVAResult:
    """Compute CVA + FVA for a single netting set.

    Aggregates exposure from the desks this counterparty trades with,
    then runs Monte Carlo on credit intensity and funding cost.
    """
    rng = np.random.default_rng(seed)
    n = len(exposure_profiles[0].expected_positive_exposure)
    dt = 1.0 / 12
    lgd = 0.6
    funding_spread = 0.005

    # Aggregate EPE/EE across desks in this netting set
    total_epe = np.zeros(n)
    total_ee = np.zeros(n)
    for ep in exposure_profiles:
        total_epe += ep.expected_positive_exposure
        total_ee += ep.expected_exposure

    intensity = credit_params.long_term_mean
    vol = credit_params.volatility
    kappa = credit_params.mean_reversion

    n_mc = config.n_paths
    chunk_size = min(n_mc, 10_000)
    n_chunks = max(n_mc // chunk_size, 1)

    cva_samples = np.zeros(n_chunks)
    fva_samples = np.zeros(n_chunks)
    cva_ts = np.zeros(n)
    fva_ts = np.zeros(n)

    for c in range(n_chunks):
        lam = np.full(chunk_size, max(intensity, 1e-6))
        spread = np.full(chunk_size, funding_spread)
        survival = np.ones(chunk_size)
        chunk_cva = np.zeros(chunk_size)
        chunk_fva = np.zeros(chunk_size)

        for t in range(n):
            dW = rng.standard_normal(chunk_size) * np.sqrt(dt)
            dW2 = rng.standard_normal(chunk_size) * np.sqrt(dt)

            lam = np.maximum(
                lam + kappa * (intensity - lam) * dt
                + vol * np.sqrt(np.maximum(lam, 0)) * dW,
                1e-6,
            )
            spread = np.maximum(
                spread + 0.1 * (funding_spread - spread) * dt + 0.001 * dW2,
                0,
            )

            default_prob = 1 - np.exp(-lam * dt)
            discount = np.exp(-0.03 * t * dt)

            inc_cva = discount * lgd * total_epe[t] * default_prob * survival
            inc_fva = discount * spread * total_ee[t] * dt

            chunk_cva += inc_cva
            chunk_fva += inc_fva
            cva_ts[t] += np.sum(inc_cva)
            fva_ts[t] += np.sum(inc_fva)

            survival *= 1 - default_prob

        cva_samples[c] = np.mean(chunk_cva)
        fva_samples[c] = np.mean(chunk_fva)

    denom = n_chunks * chunk_size
    cva_ts /= denom
    fva_ts /= denom

    return XVAResult(
        netting_set=ns_name,
        cva=float(np.mean(cva_samples)),
        fva=float(np.mean(fva_samples)),
        term_structure_cva=cva_ts,
        term_structure_fva=fva_ts,
    )


def _make_xva_calculator(ns_name: str, ns_cfg: dict, n_desks: int):
    """Factory: return a function that computes XVA for one netting set.

    The function signature lists the exact exposure profiles + credit params
    it needs so pargraph can wire the DAG edges.
    """
    desk_indices = ns_cfg["desk_indices"]
    seed = ns_cfg["seed"]

    # Build a function that takes (exp_0, exp_1, ..., exp_4, credit_params, config)
    def compute_xva(*args) -> XVAResult:
        # Last two args are credit_params and config
        config = args[-1]
        credit_params = args[-2]
        exposures = list(args[:-2])
        return _compute_netting_set_xva(exposures, credit_params, config, seed, ns_name)

    compute_xva.__name__ = f"xva_{ns_name.lower()}"
    compute_xva.__qualname__ = compute_xva.__name__
    return compute_xva


XVA_CALCULATORS = {}
for _ns, _cfg in NETTING_SET_CONFIG.items():
    XVA_CALCULATORS[_ns] = _make_xva_calculator(_ns, _cfg, len(DESK_NAMES))


# ---------------------------------------------------------------------------
# Level 5: Sensitivities per Netting Set (15 parallel nodes)
# ---------------------------------------------------------------------------


def _compute_netting_set_sensitivities(
    xva: XVAResult,
    exposure_profiles: List[ExposureProfile],
    credit_params: ModelParams,
    config: ComputeConfig,
    seed: int,
    ns_name: str,
) -> SensitivityResult:
    """Bump-and-reprice sensitivities for one netting set.

    Re-runs XVA with bumped inputs for IR, credit, FX, equity.
    """
    bump = config.bump_size
    base_cva = xva.cva

    def _bumped_exposures(factor):
        return [
            ExposureProfile(
                asset_class=ep.asset_class,
                n_trades=ep.n_trades,
                expected_exposure=ep.expected_exposure * factor,
                expected_positive_exposure=ep.expected_positive_exposure * factor,
                expected_negative_exposure=ep.expected_negative_exposure * factor,
                peak_exposure=ep.peak_exposure * factor,
            )
            for ep in exposure_profiles
        ]

    # Delta IR: bump all exposures
    xva_up = _compute_netting_set_xva(
        _bumped_exposures(1 + bump * 1000), credit_params, config, seed + 100, ns_name
    )
    delta_ir = (xva_up.cva - base_cva) / bump

    # Delta Credit: bump intensity
    cr_bumped = ModelParams(
        name=credit_params.name,
        mean_reversion=credit_params.mean_reversion,
        volatility=credit_params.volatility,
        long_term_mean=credit_params.long_term_mean * (1 + bump * 100),
        correlation_to_rates=credit_params.correlation_to_rates,
        calibration_error=credit_params.calibration_error,
    )
    xva_cr = _compute_netting_set_xva(
        exposure_profiles, cr_bumped, config, seed + 200, ns_name
    )
    delta_credit = (xva_cr.cva - base_cva) / bump

    # Delta FX
    xva_fx = _compute_netting_set_xva(
        _bumped_exposures(1 + bump * 100), credit_params, config, seed + 300, ns_name
    )
    delta_fx = (xva_fx.cva - base_cva) / bump

    # Delta Equity
    xva_eq = _compute_netting_set_xva(
        _bumped_exposures(1 + bump * 50), credit_params, config, seed + 400, ns_name
    )
    delta_eq = (xva_eq.cva - base_cva) / bump

    # Gamma IR
    xva_down = _compute_netting_set_xva(
        _bumped_exposures(1 - bump * 1000), credit_params, config, seed + 500, ns_name
    )
    gamma_ir = (xva_up.cva - 2 * base_cva + xva_down.cva) / (bump**2)

    # Vega
    cr_vega = ModelParams(
        name=credit_params.name,
        mean_reversion=credit_params.mean_reversion,
        volatility=credit_params.volatility * (1 + bump * 100),
        long_term_mean=credit_params.long_term_mean,
        correlation_to_rates=credit_params.correlation_to_rates,
        calibration_error=credit_params.calibration_error,
    )
    xva_vega = _compute_netting_set_xva(
        exposure_profiles, cr_vega, config, seed + 600, ns_name
    )
    vega = (xva_vega.cva - base_cva) / bump

    return SensitivityResult(
        netting_set=ns_name,
        delta_ir=delta_ir,
        delta_credit=delta_credit,
        delta_fx=delta_fx,
        delta_equity=delta_eq,
        gamma_ir=gamma_ir,
        vega=vega,
    )


def _make_sensitivity_calculator(ns_name: str, ns_cfg: dict):
    """Factory: return a function that computes sensitivities for one netting set."""
    desk_indices = ns_cfg["desk_indices"]
    seed = ns_cfg["seed"]

    def compute_sens(*args) -> SensitivityResult:
        # args: xva_result, exp_0..exp_4, credit_params, config
        config = args[-1]
        credit_params = args[-2]
        xva = args[0]
        exposures = list(args[1:-2])
        return _compute_netting_set_sensitivities(
            xva, exposures, credit_params, config, seed, ns_name
        )

    compute_sens.__name__ = f"sens_{ns_name.lower()}"
    compute_sens.__qualname__ = compute_sens.__name__
    return compute_sens


SENSITIVITY_CALCULATORS = {}
for _ns, _cfg in NETTING_SET_CONFIG.items():
    SENSITIVITY_CALCULATORS[_ns] = _make_sensitivity_calculator(_ns, _cfg)


# ---------------------------------------------------------------------------
# Level 6: Report Aggregation
# ---------------------------------------------------------------------------


def aggregate_xva_report(*args) -> XVARiskReport:
    """Aggregate all netting set XVA results + sensitivities into final report.

    args: xva_1, ..., xva_15, sens_1, ..., sens_15
    """
    n_ns = len(NETTING_SETS)
    xva_results = args[:n_ns]
    sens_results = args[n_ns:2 * n_ns]

    total_cva = sum(x.cva for x in xva_results)
    total_fva = sum(x.fva for x in xva_results)

    breakdown = {}
    for xva in xva_results:
        breakdown[xva.netting_set] = {"cva": xva.cva, "fva": xva.fva}

    # Aggregate sensitivities
    agg_cva_sens = {"delta_ir": 0, "delta_credit": 0, "delta_fx": 0,
                    "delta_equity": 0, "gamma_ir": 0, "vega": 0}
    agg_fva_sens = dict(agg_cva_sens)
    for s in sens_results:
        agg_cva_sens["delta_ir"] += s.delta_ir
        agg_cva_sens["delta_credit"] += s.delta_credit
        agg_cva_sens["delta_fx"] += s.delta_fx
        agg_cva_sens["delta_equity"] += s.delta_equity
        agg_cva_sens["gamma_ir"] += s.gamma_ir
        agg_cva_sens["vega"] += s.vega

    return XVARiskReport(
        total_cva=total_cva,
        total_fva=total_fva,
        total_xva=total_cva + total_fva,
        netting_set_breakdown=breakdown,
        cva_sensitivities=agg_cva_sens,
        fva_sensitivities=agg_fva_sens,
    )

# ---------------------------------------------------------------------------
# Pretty printing
# ---------------------------------------------------------------------------


def print_report(report: XVARiskReport) -> None:
    print("\n" + "=" * 60)
    print("           XVA RISK REPORT")
    print("=" * 60)
    print(f"  CVA:       {report.total_cva:>16,.2f}")
    print(f"  FVA:       {report.total_fva:>16,.2f}")
    print(f"  Total XVA: {report.total_xva:>16,.2f}")
    print("-" * 60)
    print("  Netting Set Breakdown:")
    for ns, vals in report.netting_set_breakdown.items():
        print(f"    {ns:>15s}: CVA={vals['cva']:>12,.2f}  FVA={vals['fva']:>12,.2f}")
    print("-" * 60)
    print("  CVA Sensitivities:")
    for k, v in report.cva_sensitivities.items():
        print(f"    {k:>15s}: {v:>16,.2f}")
    print("  FVA Sensitivities:")
    for k, v in report.fva_sensitivities.items():
        print(f"    {k:>15s}: {v:>16,.2f}")
    print("-" * 60)
    print("=" * 60)




def run_sequential(config: ComputeConfig) -> XVARiskReport:
    """Run the full pipeline sequentially."""
    params = {}
    for name in DESK_NAMES:
        params[name] = CALIBRATORS[name](config)

    sims = {}
    for name in DESK_NAMES:
        if name == "IR_USD":
            sims[name] = SIMULATORS[name](params[name], config)
        elif name.startswith("CR_"):
            sims[name] = SIMULATORS[name](params[name], config)
        else:
            sims[name] = SIMULATORS[name](params[name], params["IR_USD"], config)

    exposures = {}
    for name in DESK_NAMES:
        if name == "IR_USD":
            exposures[name] = PRICERS[name](sims[name], config)
        else:
            exposures[name] = PRICERS[name](sims[name], sims["IR_USD"], config)

    xva_results = {}
    for ns_name, ns_cfg in NETTING_SET_CONFIG.items():
        desk_exps = [exposures[DESK_NAMES[i]] for i in ns_cfg["desk_indices"]]
        credit_p = params[ns_cfg["credit_model"]]
        xva_results[ns_name] = _compute_netting_set_xva(
            desk_exps, credit_p, config, ns_cfg["seed"], ns_name
        )

    sens_results = {}
    for ns_name, ns_cfg in NETTING_SET_CONFIG.items():
        desk_exps = [exposures[DESK_NAMES[i]] for i in ns_cfg["desk_indices"]]
        credit_p = params[ns_cfg["credit_model"]]
        sens_results[ns_name] = _compute_netting_set_sensitivities(
            xva_results[ns_name], desk_exps, credit_p, config, ns_cfg["seed"], ns_name
        )

    report_args = (
        [xva_results[ns] for ns in NETTING_SETS]
        + [sens_results[ns] for ns in NETTING_SETS]
    )
    report = aggregate_xva_report(*report_args)
    print_report(report)
    return report

run_sequential(MEDIUM_CONFIG)

Pargraph Implementation

[ ]:
import os

os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["GOTO_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"

from dataclasses import dataclass, field
from typing import Dict, List, Tuple

import numpy as np
from pargraph import GraphEngine
from scaler import Client, SchedulerClusterCombo

# ---------------------------------------------------------------------------
# Data classes — small, serializable results passed between DAG nodes
# ---------------------------------------------------------------------------


@dataclass
class ModelParams:
    """Calibrated model parameters. Tiny payload (few floats)."""

    name: str
    mean_reversion: float
    volatility: float
    long_term_mean: float
    correlation_to_rates: float
    calibration_error: float


@dataclass
class SimulationResult:
    """Summary statistics from Monte Carlo simulation. Small payload."""

    asset_class: str
    n_paths: int
    n_timesteps: int
    mean_path: np.ndarray  # shape (n_timesteps,)
    percentile_97_5: np.ndarray
    percentile_2_5: np.ndarray
    terminal_distribution_moments: Tuple[float, float, float, float]


@dataclass
class ExposureProfile:
    """Portfolio exposure profile for one desk/asset class. Small payload."""

    asset_class: str
    n_trades: int
    expected_exposure: np.ndarray  # shape (n_timesteps,)
    expected_positive_exposure: np.ndarray
    expected_negative_exposure: np.ndarray
    peak_exposure: float


@dataclass
class XVAResult:
    """XVA metrics for a single netting set."""

    netting_set: str
    cva: float
    fva: float
    term_structure_cva: np.ndarray
    term_structure_fva: np.ndarray


@dataclass
class SensitivityResult:
    """Greeks/sensitivities for one netting set."""

    netting_set: str
    delta_ir: float
    delta_credit: float
    delta_fx: float
    delta_equity: float
    gamma_ir: float
    vega: float


@dataclass
class XVARiskReport:
    """Final aggregated report. Small payload."""

    total_cva: float
    total_fva: float
    total_xva: float
    netting_set_breakdown: Dict[str, Dict[str, float]]
    cva_sensitivities: Dict[str, float]
    fva_sensitivities: Dict[str, float]


# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------


@dataclass
class ComputeConfig:
    """Controls how heavy each computation step is."""

    n_paths: int
    n_timesteps: int
    n_trades: int  # trades per desk
    n_calibration_iterations: int
    bump_size: float = 0.0001


TEST_CONFIG = ComputeConfig(
    n_paths=10_000,
    n_timesteps=60,
    n_trades=100,
    n_calibration_iterations=5_000,
)

MEDIUM_CONFIG = ComputeConfig(
    n_paths=100_000,
    n_timesteps=180,
    n_trades=300,
    n_calibration_iterations=15_000,
)

FULL_CONFIG = ComputeConfig(
    n_paths=500_000,
    n_timesteps=360,
    n_trades=2_000,
    n_calibration_iterations=50_000,
)


# ---------------------------------------------------------------------------
# Asset class definitions — 15 models to calibrate
# ---------------------------------------------------------------------------

# (name, seed, param_ranges, correlation_to_rates)
MODELS = [
    # 5 IR curves (different currencies)
    ("IR_USD", 100, {"mr": (0.01, 0.5), "vol": (0.005, 0.02), "mean": (0.02, 0.05)}, 1.0),
    ("IR_EUR", 101, {"mr": (0.01, 0.5), "vol": (0.003, 0.015), "mean": (0.01, 0.04)}, 0.9),
    ("IR_GBP", 102, {"mr": (0.01, 0.5), "vol": (0.004, 0.018), "mean": (0.015, 0.045)}, 0.85),
    ("IR_JPY", 103, {"mr": (0.01, 0.5), "vol": (0.001, 0.008), "mean": (-0.005, 0.015)}, 0.7),
    ("IR_CHF", 104, {"mr": (0.01, 0.5), "vol": (0.002, 0.012), "mean": (-0.003, 0.02)}, 0.75),
    # 4 FX pairs
    ("FX_EURUSD", 200, {"mr": (0.0, 0.3), "vol": (0.05, 0.15), "mean": (1.0, 1.2)}, 0.3),
    ("FX_GBPUSD", 201, {"mr": (0.0, 0.3), "vol": (0.06, 0.18), "mean": (1.2, 1.4)}, 0.25),
    ("FX_USDJPY", 202, {"mr": (0.0, 0.3), "vol": (0.04, 0.12), "mean": (100, 150)}, 0.2),
    ("FX_USDCHF", 203, {"mr": (0.0, 0.3), "vol": (0.05, 0.14), "mean": (0.85, 1.0)}, 0.22),
    # 3 equity indices
    ("EQ_SPX", 300, {"mr": (0.5, 5.0), "vol": (0.1, 0.5), "mean": (0.02, 0.08)}, -0.2),
    ("EQ_EUROSTOXX", 301, {"mr": (0.5, 5.0), "vol": (0.12, 0.6), "mean": (0.02, 0.10)}, -0.25),
    ("EQ_NIKKEI", 302, {"mr": (0.5, 5.0), "vol": (0.10, 0.45), "mean": (0.01, 0.07)}, -0.15),
    # 3 credit sectors
    ("CR_IG", 400, {"mr": (0.01, 1.0), "vol": (0.005, 0.03), "mean": (0.003, 0.015)}, -0.15),
    ("CR_HY", 401, {"mr": (0.01, 1.0), "vol": (0.01, 0.06), "mean": (0.01, 0.05)}, -0.25),
    ("CR_EM", 402, {"mr": (0.01, 1.0), "vol": (0.008, 0.05), "mean": (0.008, 0.04)}, -0.20),
]

# 15 desks for pricing (one per model)
DESK_NAMES = [m[0] for m in MODELS]

# 15 netting sets for XVA
NETTING_SETS = [
    "NS_USBank1", "NS_USBank2", "NS_USBank3",
    "NS_EUBank1", "NS_EUBank2", "NS_EUBank3",
    "NS_UKBank1", "NS_UKBank2",
    "NS_JPBank1", "NS_JPBank2",
    "NS_Hedge1", "NS_Hedge2", "NS_Hedge3",
    "NS_Corp1", "NS_Corp2",
]

# Map netting sets to credit sector and subset of desks they trade with
NETTING_SET_CONFIG = {}
for i, ns in enumerate(NETTING_SETS):
    # Each netting set references a credit model and a subset of desks
    credit_model = ["CR_IG", "CR_HY", "CR_EM"][i % 3]
    # Each netting set sees 5 of the 15 desks (rotating)
    desk_indices = [(i * 3 + j) % 15 for j in range(5)]
    NETTING_SET_CONFIG[ns] = {
        "credit_model": credit_model,
        "desk_indices": desk_indices,
        "seed": 7000 + i,
    }


# ---------------------------------------------------------------------------
# Level 1: Model Calibration (15 parallel nodes)
# ---------------------------------------------------------------------------


def _calibrate_model(name: str, seed: int, param_ranges: dict,
                     corr: float, config: ComputeConfig) -> ModelParams:
    """Generic calibration: iterative least-squares fitting.

    Heavy compute: n_calibration_iterations × (n_paths/10 × n_market_instruments).
    Output: ~6 floats.
    """
    rng = np.random.default_rng(seed)
    n_instruments = 60 + seed % 40  # 60-100 instruments

    market_data = 0.01 + 0.005 * rng.standard_normal(n_instruments)

    best_params = None
    best_error = float("inf")

    for _ in range(config.n_calibration_iterations):
        trial_mr = rng.uniform(*param_ranges["mr"])
        trial_vol = rng.uniform(*param_ranges["vol"])
        trial_mean = rng.uniform(*param_ranges["mean"])

        dt = 1.0 / 12
        paths = rng.standard_normal((config.n_paths // 10, n_instruments))
        model_vols = trial_vol * np.sqrt(
            (1 - np.exp(-2 * trial_mr * dt)) / (2 * max(trial_mr, 1e-6))
        )
        model_prices = trial_mean + model_vols * paths
        implied_vols = np.std(model_prices, axis=0)

        error = np.mean((implied_vols - np.abs(market_data)) ** 2)
        if error < best_error:
            best_error = error
            best_params = (trial_mr, trial_vol, trial_mean)

    return ModelParams(
        name=name,
        mean_reversion=best_params[0],
        volatility=best_params[1],
        long_term_mean=best_params[2],
        correlation_to_rates=corr,
        calibration_error=best_error,
    )


# Generate one calibration function per model (15 functions).
# Each is a standalone callable that pargraph can dispatch independently.

def _make_calibrator(name, seed, ranges, corr):
    """Factory: return a function(config) -> ModelParams for one model."""
    def calibrate(config: ComputeConfig) -> ModelParams:
        return _calibrate_model(name, seed, ranges, corr, config)
    calibrate.__name__ = f"calibrate_{name.lower()}"
    calibrate.__qualname__ = calibrate.__name__
    return calibrate


CALIBRATORS = {}
for _name, _seed, _ranges, _corr in MODELS:
    CALIBRATORS[_name] = _make_calibrator(_name, _seed, _ranges, _corr)


# ---------------------------------------------------------------------------
# Level 2: Monte Carlo Simulation (15 parallel nodes)
# ---------------------------------------------------------------------------


def _simulate_paths(params: ModelParams, ir_ref_params: ModelParams,
                    config: ComputeConfig, seed: int) -> SimulationResult:
    """Generic path simulation using calibrated parameters.

    Heavy compute: n_paths × n_timesteps evolution with chunked memory.
    Output: 3 vectors of length n_timesteps + 4 floats.
    """
    from scipy.stats import kurtosis, skew

    rng = np.random.default_rng(seed)
    dt = 1.0 / 12
    n = config.n_timesteps
    kappa = params.mean_reversion
    sigma = params.volatility
    theta = params.long_term_mean
    rho = params.correlation_to_rates

    chunk_size = min(config.n_paths, 10_000)
    n_chunks = max(config.n_paths // chunk_size, 1)

    running_sum = np.zeros(n)
    running_sum_sq = np.zeros(n)
    terminal_values = []

    is_equity = params.name.startswith("EQ_")
    is_fx = params.name.startswith("FX_")

    for _ in range(n_chunks):
        if is_equity:
            # Heston-style stochastic vol
            log_s = np.zeros((chunk_size, n))
            variance = np.full(chunk_size, theta)
            xi = sigma  # vol of vol
            v_bar = theta
            for t in range(1, n):
                z1 = rng.standard_normal(chunk_size)
                z2 = rng.standard_normal(chunk_size)
                z_vol = -0.7 * z1 + np.sqrt(1 - 0.49) * z2
                variance = np.maximum(
                    variance + kappa * (v_bar - variance) * dt
                    + xi * np.sqrt(np.maximum(variance, 0) * dt) * z_vol,
                    1e-8,
                )
                r = ir_ref_params.long_term_mean
                log_s[:, t] = (log_s[:, t - 1]
                               + (r - 0.5 * variance) * dt
                               + np.sqrt(variance * dt) * z1)
            values = np.exp(log_s) * 100
        elif is_fx:
            # GBM with stochastic drift from IR differential
            log_fx = np.zeros((chunk_size, n))
            log_fx[:, 0] = np.log(max(theta, 0.01))
            for t in range(1, n):
                z1 = rng.standard_normal(chunk_size)
                z2 = rng.standard_normal(chunk_size)
                z_corr = rho * z1 + np.sqrt(max(1 - rho**2, 0)) * z2
                ir_diff = ir_ref_params.long_term_mean * 0.1 * np.sin(2 * np.pi * t / 12)
                log_fx[:, t] = (log_fx[:, t - 1]
                                + (ir_diff - 0.5 * sigma**2) * dt
                                + sigma * np.sqrt(dt) * z_corr)
            values = np.exp(log_fx)
        else:
            # Mean-reverting (IR or Credit)
            rates = np.zeros((chunk_size, n))
            rates[:, 0] = theta
            for t in range(1, n):
                dW = rng.standard_normal(chunk_size) * np.sqrt(dt)
                rates[:, t] = (rates[:, t - 1]
                               + kappa * (theta - rates[:, t - 1]) * dt
                               + sigma * dW)
            values = rates

        running_sum += np.sum(values, axis=0)
        running_sum_sq += np.sum(values**2, axis=0)
        terminal_values.extend(values[:, -1].tolist())

    total = chunk_size * n_chunks
    mean_path = running_sum / total
    std_path = np.sqrt(np.maximum(running_sum_sq / total - mean_path**2, 0))

    terminal = np.array(terminal_values)
    return SimulationResult(
        asset_class=params.name,
        n_paths=config.n_paths,
        n_timesteps=n,
        mean_path=mean_path,
        percentile_97_5=mean_path + 1.96 * std_path,
        percentile_2_5=mean_path - 1.96 * std_path,
        terminal_distribution_moments=(
            float(np.mean(terminal)),
            float(np.std(terminal)),
            float(skew(terminal)),
            float(kurtosis(terminal)),
        ),
    )


def _make_simulator(model_name: str, seed: int, needs_ir_ref: bool):
    """Factory: return a simulation function for one model.

    IR_USD is the reference curve; other models may depend on it.
    """
    if model_name == "IR_USD":
        # IR_USD doesn't depend on another IR ref
        def simulate(params: ModelParams, config: ComputeConfig) -> SimulationResult:
            return _simulate_paths(params, params, config, seed)
    elif needs_ir_ref:
        def simulate(params: ModelParams, ir_usd_params: ModelParams,
                     config: ComputeConfig) -> SimulationResult:
            return _simulate_paths(params, ir_usd_params, config, seed)
    else:
        # Credit models don't need IR ref for simulation
        def simulate(params: ModelParams, config: ComputeConfig) -> SimulationResult:
            return _simulate_paths(params, ModelParams("dummy", 0.1, 0.01, 0.03, 0, 0),
                                   config, seed)
    simulate.__name__ = f"simulate_{model_name.lower()}"
    simulate.__qualname__ = simulate.__name__
    return simulate


SIMULATORS = {}
for _name, _seed, _, _ in MODELS:
    _needs_ir = _name.startswith("FX_") or _name.startswith("EQ_") or (
        _name.startswith("IR_") and _name != "IR_USD"
    )
    SIMULATORS[_name] = _make_simulator(_name, 1000 + _seed, _needs_ir)


# ---------------------------------------------------------------------------
# Level 3: Portfolio Pricing (15 parallel nodes, one per desk)
# ---------------------------------------------------------------------------


def _price_desk_portfolio(sim: SimulationResult, ir_ref_sim: SimulationResult,
                          config: ComputeConfig, seed: int) -> ExposureProfile:
    """Price a desk's portfolio under simulated scenarios.

    Heavy compute: n_trades × n_scenarios present value calculations per timestep.
    """
    rng = np.random.default_rng(seed)
    n = sim.n_timesteps

    notionals = rng.uniform(1e6, 50e6, config.n_trades)
    strikes = sim.mean_path[0] * rng.uniform(0.9, 1.1, config.n_trades)
    maturities = rng.integers(max(12, n // 4), n, config.n_trades)
    is_option = rng.random(config.n_trades) > 0.5

    ee = np.zeros(n)
    epe = np.zeros(n)
    ene = np.zeros(n)

    for t in range(n):
        alive = maturities > t
        if not np.any(alive):
            continue
        remaining = (maturities[alive] - t) / 12.0
        spot = sim.mean_path[t]
        spread = max((sim.percentile_97_5[t] - sim.percentile_2_5[t]) / 2, 1e-10)
        r = ir_ref_sim.mean_path[min(t, len(ir_ref_sim.mean_path) - 1)]

        n_scenarios = max(config.n_paths // 100, 10)
        scenario_mtms = np.zeros(n_scenarios)

        for s in range(n_scenarios):
            scenario_factor = spot + rng.standard_normal() * spread
            mtm = notionals[alive] * remaining * (scenario_factor - strikes[alive])
            opt_mask = is_option[alive]
            mtm[opt_mask] = np.maximum(mtm[opt_mask], 0)
            scenario_mtms[s] = np.sum(mtm * np.exp(-max(r, 0) * remaining))

        ee[t] = np.mean(scenario_mtms)
        epe[t] = np.mean(np.maximum(scenario_mtms, 0))
        ene[t] = np.mean(np.minimum(scenario_mtms, 0))

    return ExposureProfile(
        asset_class=sim.asset_class,
        n_trades=config.n_trades,
        expected_exposure=ee,
        expected_positive_exposure=epe,
        expected_negative_exposure=ene,
        peak_exposure=float(np.max(np.abs(epe))),
    )


def _make_pricer(model_name: str, seed: int):
    """Factory: return a pricing function for one desk."""
    if model_name == "IR_USD":
        def price(sim: SimulationResult, config: ComputeConfig) -> ExposureProfile:
            return _price_desk_portfolio(sim, sim, config, seed)
    else:
        def price(sim: SimulationResult, ir_usd_sim: SimulationResult,
                  config: ComputeConfig) -> ExposureProfile:
            return _price_desk_portfolio(sim, ir_usd_sim, config, seed)
    price.__name__ = f"price_{model_name.lower()}"
    price.__qualname__ = price.__name__
    return price


PRICERS = {}
for _name, _seed, _, _ in MODELS:
    PRICERS[_name] = _make_pricer(_name, 4000 + _seed)


# ---------------------------------------------------------------------------
# Level 4: XVA per Netting Set (15 parallel nodes)
# ---------------------------------------------------------------------------


def _compute_netting_set_xva(
    exposure_profiles: List[ExposureProfile],
    credit_params: ModelParams,
    config: ComputeConfig,
    seed: int,
    ns_name: str,
) -> XVAResult:
    """Compute CVA + FVA for a single netting set.

    Aggregates exposure from the desks this counterparty trades with,
    then runs Monte Carlo on credit intensity and funding cost.
    """
    rng = np.random.default_rng(seed)
    n = len(exposure_profiles[0].expected_positive_exposure)
    dt = 1.0 / 12
    lgd = 0.6
    funding_spread = 0.005

    # Aggregate EPE/EE across desks in this netting set
    total_epe = np.zeros(n)
    total_ee = np.zeros(n)
    for ep in exposure_profiles:
        total_epe += ep.expected_positive_exposure
        total_ee += ep.expected_exposure

    intensity = credit_params.long_term_mean
    vol = credit_params.volatility
    kappa = credit_params.mean_reversion

    n_mc = config.n_paths
    chunk_size = min(n_mc, 10_000)
    n_chunks = max(n_mc // chunk_size, 1)

    cva_samples = np.zeros(n_chunks)
    fva_samples = np.zeros(n_chunks)
    cva_ts = np.zeros(n)
    fva_ts = np.zeros(n)

    for c in range(n_chunks):
        lam = np.full(chunk_size, max(intensity, 1e-6))
        spread = np.full(chunk_size, funding_spread)
        survival = np.ones(chunk_size)
        chunk_cva = np.zeros(chunk_size)
        chunk_fva = np.zeros(chunk_size)

        for t in range(n):
            dW = rng.standard_normal(chunk_size) * np.sqrt(dt)
            dW2 = rng.standard_normal(chunk_size) * np.sqrt(dt)

            lam = np.maximum(
                lam + kappa * (intensity - lam) * dt
                + vol * np.sqrt(np.maximum(lam, 0)) * dW,
                1e-6,
            )
            spread = np.maximum(
                spread + 0.1 * (funding_spread - spread) * dt + 0.001 * dW2,
                0,
            )

            default_prob = 1 - np.exp(-lam * dt)
            discount = np.exp(-0.03 * t * dt)

            inc_cva = discount * lgd * total_epe[t] * default_prob * survival
            inc_fva = discount * spread * total_ee[t] * dt

            chunk_cva += inc_cva
            chunk_fva += inc_fva
            cva_ts[t] += np.sum(inc_cva)
            fva_ts[t] += np.sum(inc_fva)

            survival *= 1 - default_prob

        cva_samples[c] = np.mean(chunk_cva)
        fva_samples[c] = np.mean(chunk_fva)

    denom = n_chunks * chunk_size
    cva_ts /= denom
    fva_ts /= denom

    return XVAResult(
        netting_set=ns_name,
        cva=float(np.mean(cva_samples)),
        fva=float(np.mean(fva_samples)),
        term_structure_cva=cva_ts,
        term_structure_fva=fva_ts,
    )


def _make_xva_calculator(ns_name: str, ns_cfg: dict, n_desks: int):
    """Factory: return a function that computes XVA for one netting set.

    The function signature lists the exact exposure profiles + credit params
    it needs so pargraph can wire the DAG edges.
    """
    desk_indices = ns_cfg["desk_indices"]
    seed = ns_cfg["seed"]

    # Build a function that takes (exp_0, exp_1, ..., exp_4, credit_params, config)
    def compute_xva(*args) -> XVAResult:
        # Last two args are credit_params and config
        config = args[-1]
        credit_params = args[-2]
        exposures = list(args[:-2])
        return _compute_netting_set_xva(exposures, credit_params, config, seed, ns_name)

    compute_xva.__name__ = f"xva_{ns_name.lower()}"
    compute_xva.__qualname__ = compute_xva.__name__
    return compute_xva


XVA_CALCULATORS = {}
for _ns, _cfg in NETTING_SET_CONFIG.items():
    XVA_CALCULATORS[_ns] = _make_xva_calculator(_ns, _cfg, len(DESK_NAMES))


# ---------------------------------------------------------------------------
# Level 5: Sensitivities per Netting Set (15 parallel nodes)
# ---------------------------------------------------------------------------


def _compute_netting_set_sensitivities(
    xva: XVAResult,
    exposure_profiles: List[ExposureProfile],
    credit_params: ModelParams,
    config: ComputeConfig,
    seed: int,
    ns_name: str,
) -> SensitivityResult:
    """Bump-and-reprice sensitivities for one netting set.

    Re-runs XVA with bumped inputs for IR, credit, FX, equity.
    """
    bump = config.bump_size
    base_cva = xva.cva

    def _bumped_exposures(factor):
        return [
            ExposureProfile(
                asset_class=ep.asset_class,
                n_trades=ep.n_trades,
                expected_exposure=ep.expected_exposure * factor,
                expected_positive_exposure=ep.expected_positive_exposure * factor,
                expected_negative_exposure=ep.expected_negative_exposure * factor,
                peak_exposure=ep.peak_exposure * factor,
            )
            for ep in exposure_profiles
        ]

    # Delta IR: bump all exposures
    xva_up = _compute_netting_set_xva(
        _bumped_exposures(1 + bump * 1000), credit_params, config, seed + 100, ns_name
    )
    delta_ir = (xva_up.cva - base_cva) / bump

    # Delta Credit: bump intensity
    cr_bumped = ModelParams(
        name=credit_params.name,
        mean_reversion=credit_params.mean_reversion,
        volatility=credit_params.volatility,
        long_term_mean=credit_params.long_term_mean * (1 + bump * 100),
        correlation_to_rates=credit_params.correlation_to_rates,
        calibration_error=credit_params.calibration_error,
    )
    xva_cr = _compute_netting_set_xva(
        exposure_profiles, cr_bumped, config, seed + 200, ns_name
    )
    delta_credit = (xva_cr.cva - base_cva) / bump

    # Delta FX
    xva_fx = _compute_netting_set_xva(
        _bumped_exposures(1 + bump * 100), credit_params, config, seed + 300, ns_name
    )
    delta_fx = (xva_fx.cva - base_cva) / bump

    # Delta Equity
    xva_eq = _compute_netting_set_xva(
        _bumped_exposures(1 + bump * 50), credit_params, config, seed + 400, ns_name
    )
    delta_eq = (xva_eq.cva - base_cva) / bump

    # Gamma IR
    xva_down = _compute_netting_set_xva(
        _bumped_exposures(1 - bump * 1000), credit_params, config, seed + 500, ns_name
    )
    gamma_ir = (xva_up.cva - 2 * base_cva + xva_down.cva) / (bump**2)

    # Vega
    cr_vega = ModelParams(
        name=credit_params.name,
        mean_reversion=credit_params.mean_reversion,
        volatility=credit_params.volatility * (1 + bump * 100),
        long_term_mean=credit_params.long_term_mean,
        correlation_to_rates=credit_params.correlation_to_rates,
        calibration_error=credit_params.calibration_error,
    )
    xva_vega = _compute_netting_set_xva(
        exposure_profiles, cr_vega, config, seed + 600, ns_name
    )
    vega = (xva_vega.cva - base_cva) / bump

    return SensitivityResult(
        netting_set=ns_name,
        delta_ir=delta_ir,
        delta_credit=delta_credit,
        delta_fx=delta_fx,
        delta_equity=delta_eq,
        gamma_ir=gamma_ir,
        vega=vega,
    )


def _make_sensitivity_calculator(ns_name: str, ns_cfg: dict):
    """Factory: return a function that computes sensitivities for one netting set."""
    desk_indices = ns_cfg["desk_indices"]
    seed = ns_cfg["seed"]

    def compute_sens(*args) -> SensitivityResult:
        # args: xva_result, exp_0..exp_4, credit_params, config
        config = args[-1]
        credit_params = args[-2]
        xva = args[0]
        exposures = list(args[1:-2])
        return _compute_netting_set_sensitivities(
            xva, exposures, credit_params, config, seed, ns_name
        )

    compute_sens.__name__ = f"sens_{ns_name.lower()}"
    compute_sens.__qualname__ = compute_sens.__name__
    return compute_sens


SENSITIVITY_CALCULATORS = {}
for _ns, _cfg in NETTING_SET_CONFIG.items():
    SENSITIVITY_CALCULATORS[_ns] = _make_sensitivity_calculator(_ns, _cfg)


# ---------------------------------------------------------------------------
# Level 6: Report Aggregation
# ---------------------------------------------------------------------------


def aggregate_xva_report(*args) -> XVARiskReport:
    """Aggregate all netting set XVA results + sensitivities into final report.

    args: xva_1, ..., xva_15, sens_1, ..., sens_15
    """
    n_ns = len(NETTING_SETS)
    xva_results = args[:n_ns]
    sens_results = args[n_ns:2 * n_ns]

    total_cva = sum(x.cva for x in xva_results)
    total_fva = sum(x.fva for x in xva_results)

    breakdown = {}
    for xva in xva_results:
        breakdown[xva.netting_set] = {"cva": xva.cva, "fva": xva.fva}

    # Aggregate sensitivities
    agg_cva_sens = {"delta_ir": 0, "delta_credit": 0, "delta_fx": 0,
                    "delta_equity": 0, "gamma_ir": 0, "vega": 0}
    agg_fva_sens = dict(agg_cva_sens)
    for s in sens_results:
        agg_cva_sens["delta_ir"] += s.delta_ir
        agg_cva_sens["delta_credit"] += s.delta_credit
        agg_cva_sens["delta_fx"] += s.delta_fx
        agg_cva_sens["delta_equity"] += s.delta_equity
        agg_cva_sens["gamma_ir"] += s.gamma_ir
        agg_cva_sens["vega"] += s.vega

    return XVARiskReport(
        total_cva=total_cva,
        total_fva=total_fva,
        total_xva=total_cva + total_fva,
        netting_set_breakdown=breakdown,
        cva_sensitivities=agg_cva_sens,
        fva_sensitivities=agg_fva_sens,
    )

# ---------------------------------------------------------------------------
# Pretty printing
# ---------------------------------------------------------------------------


def print_report(report: XVARiskReport) -> None:
    print("\n" + "=" * 60)
    print("           XVA RISK REPORT")
    print("=" * 60)
    print(f"  CVA:       {report.total_cva:>16,.2f}")
    print(f"  FVA:       {report.total_fva:>16,.2f}")
    print(f"  Total XVA: {report.total_xva:>16,.2f}")
    print("-" * 60)
    print("  Netting Set Breakdown:")
    for ns, vals in report.netting_set_breakdown.items():
        print(f"    {ns:>15s}: CVA={vals['cva']:>12,.2f}  FVA={vals['fva']:>12,.2f}")
    print("-" * 60)
    print("  CVA Sensitivities:")
    for k, v in report.cva_sensitivities.items():
        print(f"    {k:>15s}: {v:>16,.2f}")
    print("  FVA Sensitivities:")
    for k, v in report.fva_sensitivities.items():
        print(f"    {k:>15s}: {v:>16,.2f}")
    print("-" * 60)
    print("=" * 60)




def build_xva_graph(config: ComputeConfig) -> dict:
    graph = {
        "config": config,
    }

    # Level 1: Calibrations
    for name in DESK_NAMES:
        graph[f"params_{name}"] = (CALIBRATORS[name], "config")

    # Level 2: Simulations
    for name in DESK_NAMES:
        if name == "IR_USD":
            graph[f"sim_{name}"] = (SIMULATORS[name], f"params_{name}", "config")
        elif name.startswith("CR_"):
            graph[f"sim_{name}"] = (SIMULATORS[name], f"params_{name}", "config")
        else:
            graph[f"sim_{name}"] = (
                SIMULATORS[name], f"params_{name}", "params_IR_USD", "config"
            )

    # Level 3: Pricing
    for name in DESK_NAMES:
        if name == "IR_USD":
            graph[f"exp_{name}"] = (PRICERS[name], f"sim_{name}", "config")
        else:
            graph[f"exp_{name}"] = (
                PRICERS[name], f"sim_{name}", "sim_IR_USD", "config"
            )

    # Level 4: XVA per netting set
    for ns_name, ns_cfg in NETTING_SET_CONFIG.items():
        deps = [f"exp_{DESK_NAMES[i]}" for i in ns_cfg["desk_indices"]]
        deps.append(f"params_{ns_cfg['credit_model']}")
        deps.append("config")
        graph[f"xva_{ns_name}"] = tuple([XVA_CALCULATORS[ns_name]] + deps)

    # Level 5: Sensitivities per netting set
    for ns_name, ns_cfg in NETTING_SET_CONFIG.items():
        deps = [f"xva_{ns_name}"]
        deps += [f"exp_{DESK_NAMES[i]}" for i in ns_cfg["desk_indices"]]
        deps.append(f"params_{ns_cfg['credit_model']}")
        deps.append("config")
        graph[f"sens_{ns_name}"] = tuple([SENSITIVITY_CALCULATORS[ns_name]] + deps)

    # Level 6: Aggregation
    agg_deps = [f"xva_{ns}" for ns in NETTING_SETS]
    agg_deps += [f"sens_{ns}" for ns in NETTING_SETS]
    graph["xva_report"] = tuple([aggregate_xva_report] + agg_deps)

    return graph

def run_graph(config: ComputeConfig, n_workers: int = 4) -> XVARiskReport:
    """Run the XVA pipeline as a pargraph DAG with Scaler workers."""
    graph = build_xva_graph(config)

    cluster = SchedulerClusterCombo(
        n_workers=n_workers,
        logging_level="WARNING",
    )
    try:
        client = Client(address=cluster.get_address())
        engine = GraphEngine(backend=client)
        report = engine.get(graph, keys="xva_report")
        client.disconnect()
        print_report(report)
        return report
    finally:
        cluster.shutdown()

run_graph(MEDIUM_CONFIG, n_workers=16)

Diff: Native vs Pargraph

[1]:
import difflib
import json
import re

from IPython.display import HTML, display

with open("XVA.ipynb") as _f:
    _nb = json.load(_f)
native_code = "".join(_nb["cells"][3]["source"])
parallel_code = "".join(_nb["cells"][5]["source"])

differ = difflib.HtmlDiff(wrapcolumn=90)
table = differ.make_table(
    native_code.splitlines(),
    parallel_code.splitlines(),
    fromdesc="Native Python",
    todesc="With Pargraph",
    context=True,
    numlines=10,
)

# Strip unwanted columns: navigation links, line numbers, and nowrap attribute
table = re.sub(r'<td class="diff_next"[^>]*>.*?</td>', "", table)
table = re.sub(r'<th class="diff_next"[^>]*>.*?</th>', "", table)
table = re.sub(r'<td class="diff_header"[^>]*>.*?</td>', "", table)
table = table.replace('colspan="2"', "")
table = table.replace(' nowrap="nowrap"', "")
table = re.sub(
    r"(\s*<colgroup></colgroup>){6}",
    "\n        <colgroup></colgroup> <colgroup></colgroup>",
    table,
)

css = """<style>
    table.diff { font-family: monospace; font-size: 10px; border-collapse: collapse; width: 100%; }
    table.diff td { padding: 2px 8px; white-space: pre-wrap; word-break: break-all; text-align: left !important; }
    table.diff th { padding: 6px 8px; text-align: center; background-color: #f0f0f0; font-size: 14px; }
    td.diff_add { background-color: #e6ffec; }
    td.diff_chg { background-color: #fff8c5; }
    td.diff_sub { background-color: #ffebe9; }
    span.diff_add { background-color: #ccffd8; }
    span.diff_chg { background-color: #fff2a8; }
    span.diff_sub { background-color: #ffc0c0; }
</style>"""

display(HTML(css + table))
Native PythonWith Pargraph
os.environ["OPENBLAS_NUM_THREADS"] = "1"os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["GOTO_NUM_THREADS"] = "1"os.environ["GOTO_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"os.environ["MKL_NUM_THREADS"] = "1"
from dataclasses import dataclass, fieldfrom dataclasses import dataclass, field
from typing import Dict, List, Tuplefrom typing import Dict, List, Tuple
import numpy as npimport numpy as np
from pargraph import GraphEngine
from scaler import Client, SchedulerClusterCombo
# ---------------------------------------------------------------------------# ---------------------------------------------------------------------------
# Data classes — small, serializable results passed between DAG nodes# Data classes — small, serializable results passed between DAG nodes
# ---------------------------------------------------------------------------# ---------------------------------------------------------------------------
@dataclass@dataclass
class ModelParams:class ModelParams:
    """Calibrated model parameters. Tiny payload (few floats)."""    """Calibrated model parameters. Tiny payload (few floats)."""
        print(f"    {k:>15s}: {v:>16,.2f}")        print(f"    {k:>15s}: {v:>16,.2f}")
    print("  FVA Sensitivities:")    print("  FVA Sensitivities:")
    for k, v in report.fva_sensitivities.items():    for k, v in report.fva_sensitivities.items():
        print(f"    {k:>15s}: {v:>16,.2f}")        print(f"    {k:>15s}: {v:>16,.2f}")
    print("-" * 60)    print("-" * 60)
    print("=" * 60)    print("=" * 60)
def run_sequential(config: ComputeConfig) -> XVARiskReport:def build_xva_graph(config: ComputeConfig) -> dict:
    """Run the full pipeline sequentially."""    graph = {
    params = {}        "config": config,
    }
 
    # Level 1: Calibrations
    for name in DESK_NAMES:    for name in DESK_NAMES:
        params[name] = CALIBRATORS[name](config)        graph[f"params_{name}"] = (CALIBRATORS[name], "config")
    sims = {}    # Level 2: Simulations
    for name in DESK_NAMES:    for name in DESK_NAMES:
        if name == "IR_USD":        if name == "IR_USD":
            sims[name] = SIMULATORS[name](params[name], config)            graph[f"sim_{name}"] = (SIMULATORS[name], f"params_{name}""config")
        elif name.startswith("CR_"):        elif name.startswith("CR_"):
            sims[name] = SIMULATORS[name](params[name], config)            graph[f"sim_{name}"] = (SIMULATORS[name], f"params_{name}""config")
        else:        else:
            graph[f"sim_{name}"] = (
            sims[name] = SIMULATORS[name](params[name], params["IR_USD"], config)                SIMULATORS[name], f"params_{name}""params_IR_USD", "config"
            )
    exposures = {}    # Level 3: Pricing
    for name in DESK_NAMES:    for name in DESK_NAMES:
        if name == "IR_USD":        if name == "IR_USD":
            exposures[name] = PRICERS[name](sims[name], config)            graph[f"exp_{name}"] = (PRICERS[name], f"sim_{name}""config")
        else:        else:
            exposures[name] = PRICERS[name](sims[name], sims["IR_USD"], config)            graph[f"exp_{name}"] = (
                PRICERS[name], f"sim_{name}", "sim_IR_USD", "config"
            )
    xva_results = {}    # Level 4: XVA per netting set
    for ns_name, ns_cfg in NETTING_SET_CONFIG.items():    for ns_name, ns_cfg in NETTING_SET_CONFIG.items():
        desk_exps = [exposures[DESK_NAMES[i]] for i in ns_cfg["desk_indices"]]        deps = [f"exp_{DESK_NAMES[i]}" for i in ns_cfg["desk_indices"]]
        credit_p = params[ns_cfg["credit_model"]]        deps.append(f"params_{ns_cfg['credit_model']}")
        xva_results[ns_name] = _compute_netting_set_xva(        deps.append("config")
            desk_exps, credit_p, config, ns_cfg["seed"], ns_name        graph[f"xva_{ns_name}"] = tuple([XVA_CALCULATORS[ns_name]] + deps)
        )
    sens_results = {}    # Level 5: Sensitivities per netting set
    for ns_name, ns_cfg in NETTING_SET_CONFIG.items():    for ns_name, ns_cfg in NETTING_SET_CONFIG.items():
        deps = [f"xva_{ns_name}"]
        desk_exps = [exposures[DESK_NAMES[i]] for i in ns_cfg["desk_indices"]]        deps += [f"exp_{DESK_NAMES[i]}" for i in ns_cfg["desk_indices"]]
        credit_p = params[ns_cfg["credit_model"]]        deps.append(f"params_{ns_cfg['credit_model']}")
        sens_results[ns_name] = _compute_netting_set_sensitivities(        deps.append("config")
            xva_results[ns_name], desk_exps, credit_p, config, ns_cfg["seed"], ns_name        graph[f"sens_{ns_name}"] = tuple([SENSITIVITY_CALCULATORS[ns_name]] + deps)
        )
    report_args = (    # Level 6: Aggregation
        [xva_results[ns] for ns in NETTING_SETS]    agg_deps = [f"xva_{ns}" for ns in NETTING_SETS]
        + [sens_results[ns] for ns in NETTING_SETS]    agg_deps += [f"sens_{ns}" for ns in NETTING_SETS]
    )    graph["xva_report"] = tuple([aggregate_xva_report] + agg_deps)
    report = aggregate_xva_report(*report_args)
    print_report(report)
    return report
run_sequential(MEDIUM_CONFIG)    return graph
 
def run_graph(config: ComputeConfig, n_workers: int = 4) -> XVARiskReport:
    """Run the XVA pipeline as a pargraph DAG with Scaler workers."""
    graph = build_xva_graph(config)
 
    cluster = SchedulerClusterCombo(
        n_workers=n_workers,
        logging_level="WARNING",
    )
    try:
        client = Client(address=cluster.get_address())
        engine = GraphEngine(backend=client)
        report = engine.get(graph, keys="xva_report")
        client.disconnect()
        print_report(report)
        return report
    finally:
        cluster.shutdown()
 
run_graph(MEDIUM_CONFIG, n_workers=16)

Statistics

Parfun

Pargraph

Sequential Runtime

Parallel Runtime

Workers

Speedup

Efficiency

No

Yes

3843.8s

374.1s

16

10.27x

0.64