Parallel Swap Portfolio CVA with Pargraph + Parfun

This notebook demonstrates a full Credit Valuation Adjustment (CVA) pipeline for a portfolio of interest rate swaps, parallelised with Pargraph (DAG-level task scheduling) and Parfun (data-level scenario distribution across workers):

  1. Yield curve bootstrap — fit discount factors from par swap rates

  2. Hull-White calibration — calibrate mean-reversion and volatility to swaption normal vols

  3. Monte Carlo scenario generation — simulate 5000 short-rate paths under Hull-White dynamics

  4. Credit curve calibration — bootstrap hazard rates from CDS spreads for 50 counterparties

  5. Survival probabilities & LGD — compute default probabilities and loss-given-default

  6. Portfolio pricing — price 1000 swaps across all scenarios (the bottleneck)

  7. Exposure profiles — Expected Exposure, PFE, and time-averaged EPE

  8. CVA computation — integrate discounted expected exposure against marginal default probability

Pargraph schedules the pipeline as a DAG: steps 2 and 4 run in parallel since both depend only on step 1. Parfun parallelises step 6 by distributing the 5000 scenario computations across Scaler workers with a single @pf.parallel decorator.

Program Structure

Program Structure

Native Python Implementation

[ ]:
import os

os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["GOTO_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"

from dataclasses import dataclass, field
from typing import Dict, List, Tuple

import numpy as np
import scipy.interpolate
import scipy.optimize

# ─── Domain types ─────────────────────────────────────────────────────────────


@dataclass
class DiscountCurve:
    tenors: np.ndarray
    discount_factors: np.ndarray
    _interp: object = field(repr=False, default=None)

    def __post_init__(self):
        log_df = np.log(self.discount_factors)
        self._interp = scipy.interpolate.CubicSpline(self.tenors, log_df, bc_type="natural")

    def df(self, t: np.ndarray) -> np.ndarray:
        return np.exp(self._interp(t))

    def zero_rate(self, t: np.ndarray) -> np.ndarray:
        t = np.asarray(t, dtype=float)
        safe_t = np.where(t < 1e-10, 1e-10, t)
        return -np.log(self.df(safe_t)) / safe_t


@dataclass
class HullWhiteParams:
    mean_reversion: float  # a
    volatility: float  # sigma
    theta_times: np.ndarray  # time grid for theta(t)
    theta_values: np.ndarray  # calibrated theta(t)


@dataclass
class RateScenarios:
    rates: np.ndarray  # (n_scenarios, n_steps+1)
    time_grid: np.ndarray  # (n_steps+1,)


@dataclass
class CreditCurve:
    counterparty_id: int
    tenors: np.ndarray
    hazard_rates: np.ndarray  # piecewise-constant hazard rates

    def survival_prob(self, t: np.ndarray) -> np.ndarray:
        t = np.asarray(t, dtype=float)
        result = np.ones_like(t)
        for i in range(len(self.tenors)):
            t_start = 0.0 if i == 0 else self.tenors[i - 1]
            t_end = self.tenors[i]
            lam = self.hazard_rates[i]
            mask = t > t_start
            dt = np.minimum(t, t_end) - t_start
            dt = np.maximum(dt, 0.0)
            result[mask] *= np.exp(-lam * dt[mask])
        return result


@dataclass
class Swap:
    counterparty_id: int
    notional: float
    fixed_rate: float
    payment_times: np.ndarray  # payment dates in years
    day_fractions: np.ndarray  # year fractions per period
    is_payer: bool  # True = we pay fixed, receive float


@dataclass
class ExposureProfile:
    time_grid: np.ndarray
    expected_exposure: np.ndarray  # (n_time_steps, n_counterparties)
    potential_future_exposure: np.ndarray  # 97.5th percentile
    expected_positive_exposure: np.ndarray  # (n_counterparties,) time-averaged


@dataclass
class CVAResult:
    counterparty_cva: np.ndarray  # CVA per counterparty
    total_cva: float
    total_portfolio_npv: float
    n_counterparties: int
    n_scenarios: int
    n_swaps: int


# ─── Market data generation ──────────────────────────────────────────────────


def generate_market_data(
    n_counterparties: int,
    n_swaps_per_cp: int,
    seed: int,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, List[Swap]]:
    rng = np.random.default_rng(seed)

    # Yield curve: par swap rates at standard tenors
    tenors = np.array([0.25, 0.5, 1, 2, 3, 5, 7, 10, 15, 20, 30])
    base_rates = np.array([0.042, 0.041, 0.040, 0.038, 0.037, 0.036, 0.037, 0.038, 0.040, 0.041, 0.042])
    swap_rates = base_rates + rng.normal(0, 0.001, len(tenors))

    # Swaption implied vols: (expiry, tenor) grid
    swaption_expiries = np.array([0.5, 1, 2, 5])
    swaption_tenors = np.array([2, 5, 10])
    swaption_vols = np.full((len(swaption_expiries), len(swaption_tenors)), 0.0060)
    for i, exp in enumerate(swaption_expiries):
        for j, ten in enumerate(swaption_tenors):
            swaption_vols[i, j] = 0.004 + 0.003 * np.exp(-0.3 * exp) + 0.001 * np.exp(-0.1 * ten)
            swaption_vols[i, j] += rng.normal(0, 0.0002)

    # CDS spreads per counterparty (bps) at tenors [1, 3, 5, 7, 10] years
    cds_tenors = np.array([1.0, 3.0, 5.0, 7.0, 10.0])
    cds_spreads = np.zeros((n_counterparties, len(cds_tenors)))
    for cp in range(n_counterparties):
        base_spread = rng.uniform(30, 400)  # bps
        term_slope = rng.uniform(-5, 15)
        for j, t in enumerate(cds_tenors):
            cds_spreads[cp, j] = max(5, base_spread + term_slope * t + rng.normal(0, 10))
        cds_spreads[cp] /= 10000  # convert to decimal

    # Generate swap portfolio
    swaps = []
    for cp in range(n_counterparties):
        for _ in range(n_swaps_per_cp):
            maturity = rng.choice([2, 3, 5, 7, 10, 15, 20])
            freq = 0.5  # semi-annual
            n_payments = int(maturity / freq)
            payment_times = np.arange(1, n_payments + 1) * freq
            day_fractions = np.full(n_payments, freq)
            fixed_rate = 0.035 + rng.normal(0, 0.005)
            notional = rng.choice([1e6, 5e6, 10e6, 25e6, 50e6])
            swaps.append(
                Swap(
                    counterparty_id=cp,
                    notional=notional,
                    fixed_rate=fixed_rate,
                    payment_times=payment_times,
                    day_fractions=day_fractions,
                    is_payer=bool(rng.choice([True, False])),
                )
            )

    return swap_rates, tenors, swaption_vols, swaption_expiries, swaption_tenors, cds_spreads, cds_tenors, swaps


# ─── Yield curve bootstrapping ───────────────────────────────────────────────


def bootstrap_yield_curve(swap_rates: np.ndarray, tenors: np.ndarray) -> DiscountCurve:
    n = len(tenors)
    discount_factors = np.zeros(n)

    # Prepend t=0, DF=1
    all_tenors = np.concatenate([[0.0], tenors])
    all_dfs = np.concatenate([[1.0], np.zeros(n)])

    for i in range(n):
        rate = swap_rates[i]
        tenor = tenors[i]
        # For short tenors (< 1Y): zero-coupon style
        if tenor <= 1.0:
            all_dfs[i + 1] = 1.0 / (1.0 + rate * tenor)
        else:
            # Par swap bootstrap: rate * sum(DF_j * tau_j) + DF_n = 1
            # Approximate with annual coupons
            pv_coupons = 0.0
            for j in range(i):
                if tenors[j] >= 1.0 or j == 0:
                    dt = tenors[j] if j == 0 else tenors[j] - tenors[j - 1]
                    pv_coupons += all_dfs[j + 1] * rate * dt
            dt_last = tenor - (tenors[i - 1] if i > 0 else 0)
            all_dfs[i + 1] = (1.0 - pv_coupons) / (1.0 + rate * dt_last)

        all_dfs[i + 1] = np.clip(all_dfs[i + 1], 0.01, 1.0)
        discount_factors[i] = all_dfs[i + 1]

    return DiscountCurve(tenors=all_tenors, discount_factors=np.concatenate([[1.0], discount_factors]))


# ─── Hull-White calibration ──────────────────────────────────────────────────


def _hw_zcb_price(t: float, T: float, r_t: float, a: float, sigma: float, curve: DiscountCurve) -> float:
    if T <= t:
        return 1.0
    B = (1.0 - np.exp(-a * (T - t))) / a if a > 1e-10 else (T - t)
    P_0_T = float(curve.df(np.array([T]))[0])
    P_0_t = float(curve.df(np.array([t]))[0])
    if P_0_t < 1e-15:
        return P_0_T
    f_0_t = float(curve.zero_rate(np.array([t]))[0])
    log_A = np.log(P_0_T / P_0_t) + B * f_0_t - (sigma**2 / (4.0 * a)) * B**2 * (1.0 - np.exp(-2.0 * a * t))
    return np.exp(log_A - B * r_t)


def _hw_normal_vol(a: float, sigma: float, expiry: float, swap_tenor: float) -> float:
    """Approximate HW-implied normal swaption volatility (fast closed-form)."""
    if a < 1e-10:
        return sigma * np.sqrt(expiry) * swap_tenor
    B_swap = (1.0 - np.exp(-a * swap_tenor)) / (a * swap_tenor)
    var_factor = (1.0 - np.exp(-2.0 * a * expiry)) / (2.0 * a * expiry)
    return sigma * np.sqrt(expiry * var_factor) * B_swap


def calibrate_hull_white(
    curve: DiscountCurve,
    swaption_vols: np.ndarray,
    swaption_expiries: np.ndarray,
    swaption_tenors: np.ndarray,
) -> HullWhiteParams:
    n_exp, n_ten = swaption_vols.shape

    def objective(params):
        a, sigma = params
        if a <= 1e-6 or sigma <= 1e-6 or a > 1.0 or sigma > 0.10:
            return 1e10
        total_err = 0.0
        for i in range(n_exp):
            for j in range(n_ten):
                model_vol = _hw_normal_vol(a, sigma, swaption_expiries[i], swaption_tenors[j])
                total_err += (model_vol - swaption_vols[i, j]) ** 2
        return total_err

    result = scipy.optimize.minimize(
        objective, x0=[0.03, 0.008], method="Nelder-Mead",
        options={"maxiter": 500, "xatol": 1e-8},
    )
    a_opt, sigma_opt = max(result.x[0], 0.001), max(result.x[1], 0.001)

    # Compute theta(t) to fit initial term structure
    dt = 0.25
    t_grid = np.arange(0, 31, dt)
    f_rates = curve.zero_rate(np.maximum(t_grid, 0.01))
    df_dt = np.gradient(f_rates * t_grid, dt)
    theta = df_dt + a_opt * f_rates + (sigma_opt**2 / (2.0 * a_opt)) * (1.0 - np.exp(-2.0 * a_opt * t_grid))

    return HullWhiteParams(mean_reversion=a_opt, volatility=sigma_opt, theta_times=t_grid, theta_values=theta)


# ─── Monte Carlo rate scenario generation ────────────────────────────────────


def generate_rate_scenarios(
    hw: HullWhiteParams,
    curve: DiscountCurve,
    n_scenarios: int,
    n_steps: int,
    horizon: float = 5.0,
    seed: int = 12345,
) -> RateScenarios:
    rng = np.random.default_rng(seed)
    dt = horizon / n_steps
    time_grid = np.linspace(0, horizon, n_steps + 1)

    r0 = float(curve.zero_rate(np.array([0.01]))[0])
    rates = np.zeros((n_scenarios, n_steps + 1))
    rates[:, 0] = r0

    theta_interp = scipy.interpolate.interp1d(hw.theta_times, hw.theta_values, fill_value="extrapolate")
    a = hw.mean_reversion
    sigma = hw.volatility

    Z = rng.standard_normal((n_scenarios, n_steps))
    for step in range(n_steps):
        t = time_grid[step]
        theta_t = float(theta_interp(t))
        r = rates[:, step]
        drift = (theta_t - a * r) * dt
        diffusion = sigma * np.sqrt(dt) * Z[:, step]
        rates[:, step + 1] = r + drift + diffusion

    return RateScenarios(rates=rates, time_grid=time_grid)


# ─── Credit curve calibration ────────────────────────────────────────────────


def calibrate_single_credit_curve(
    cp_id: int,
    spreads: np.ndarray,
    cds_tenors: np.ndarray,
    curve: DiscountCurve,
) -> CreditCurve:
    n_tenors = len(cds_tenors)
    hazard_rates = np.zeros(n_tenors)
    dt_fine = 0.25

    for i in range(n_tenors):
        T = cds_tenors[i]
        spread = spreads[i]
        t_grid = np.arange(dt_fine, T + dt_fine / 2, dt_fine)
        n_points = len(t_grid)

        # Bootstrap: solve for hazard rate in this tenor bucket
        def cds_error(lam_i):
            lam_trial = hazard_rates.copy()
            lam_trial[i] = lam_i
            premium_leg = 0.0
            protection_leg = 0.0
            for k in range(n_points):
                t_k = t_grid[k]
                # Survival probability using all hazard rates up to t_k
                surv = 1.0
                for m in range(n_tenors):
                    t_start = 0.0 if m == 0 else cds_tenors[m - 1]
                    t_end = cds_tenors[m]
                    if t_k > t_start:
                        dt_eff = min(t_k, t_end) - t_start
                        surv *= np.exp(-lam_trial[m] * max(dt_eff, 0))
                d = float(curve.df(np.array([t_k]))[0])
                premium_leg += spread * dt_fine * d * surv
                # Protection: default probability in this interval
                surv_prev = 1.0
                t_prev = t_k - dt_fine
                for m in range(n_tenors):
                    t_start = 0.0 if m == 0 else cds_tenors[m - 1]
                    t_end = cds_tenors[m]
                    if t_prev > t_start:
                        dt_eff = min(t_prev, t_end) - t_start
                        surv_prev *= np.exp(-lam_trial[m] * max(dt_eff, 0))
                protection_leg += d * (surv_prev - surv)
            return premium_leg - protection_leg

        try:
            hazard_rates[i] = scipy.optimize.brentq(cds_error, 1e-6, 2.0, maxiter=200)
        except (ValueError, RuntimeError):
            hazard_rates[i] = spreads[i] / 0.6  # approximate: spread ~ lambda * LGD

    return CreditCurve(counterparty_id=cp_id, tenors=cds_tenors, hazard_rates=hazard_rates)


def calibrate_credit_curves(
    cds_spreads: np.ndarray,
    cds_tenors: np.ndarray,
    curve: DiscountCurve,
) -> List[CreditCurve]:
    n_counterparties = cds_spreads.shape[0]
    return [calibrate_single_credit_curve(cp, cds_spreads[cp], cds_tenors, curve) for cp in range(n_counterparties)]


# ─── Survival probabilities & LGD ───────────────────────────────────────────


def compute_survival_probs(
    credit_curves: List[CreditCurve],
    scenarios: RateScenarios,
) -> np.ndarray:
    time_grid = scenarios.time_grid
    n_cp = len(credit_curves)
    n_t = len(time_grid)
    surv = np.zeros((n_cp, n_t))
    for i, cc in enumerate(credit_curves):
        surv[i] = cc.survival_prob(time_grid)
    return surv


def estimate_lgd(
    cds_spreads: np.ndarray,
    credit_curves: List[CreditCurve],
    n_simulations: int = 5000,
    seed: int = 99,
) -> np.ndarray:
    rng = np.random.default_rng(seed)
    n_cp = len(credit_curves)
    lgd_estimates = np.zeros(n_cp)

    for i in range(n_cp):
        avg_spread = np.mean(cds_spreads[i])
        avg_hazard = np.mean(credit_curves[i].hazard_rates)
        if avg_hazard > 1e-10:
            point_lgd = np.clip(avg_spread / avg_hazard, 0.10, 0.95)
        else:
            point_lgd = 0.40

        # Bayesian estimation with Beta prior
        alpha = point_lgd * 20
        beta_param = (1 - point_lgd) * 20
        samples = rng.beta(max(alpha, 0.5), max(beta_param, 0.5), n_simulations)
        lgd_estimates[i] = np.mean(samples)

    return lgd_estimates


# ─── Swap portfolio pricing under scenarios ──────────────────────────────────


def _precompute_swap_flat(swaps: List[Swap]):
    """Flatten all swap cash flows into contiguous arrays for fully vectorised pricing."""
    n_swaps = len(swaps)
    # Collect per-swap metadata
    fixed_rates = np.array([s.fixed_rate for s in swaps])
    notionals = np.array([s.notional for s in swaps])
    signs = np.array([1.0 if s.is_payer else -1.0 for s in swaps])
    cp_ids = np.array([s.counterparty_id for s in swaps], dtype=int)
    max_payments = max(len(s.payment_times) for s in swaps)

    # Padded arrays (n_swaps, max_payments) — padded slots have pay_time = 0
    pay_times = np.zeros((n_swaps, max_payments))
    day_fracs = np.zeros((n_swaps, max_payments))
    n_payments = np.array([len(s.payment_times) for s in swaps], dtype=int)
    for i, s in enumerate(swaps):
        n = n_payments[i]
        pay_times[i, :n] = s.payment_times
        day_fracs[i, :n] = s.day_fractions

    return pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_payments, max_payments


def price_portfolio_single_scenario(
    scenario_rates: np.ndarray,
    time_grid: np.ndarray,
    pay_times: np.ndarray,
    day_fracs: np.ndarray,
    fixed_rates: np.ndarray,
    notionals: np.ndarray,
    signs: np.ndarray,
    cp_ids: np.ndarray,
    n_payments: np.ndarray,
    n_counterparties: int,
    a: float,
    sigma: float,
    log_P0: np.ndarray,
    zr0: np.ndarray,
    log_P0_grid: np.ndarray,
    zr0_grid: np.ndarray,
) -> np.ndarray:
    """Price all swaps at all time steps for one MC scenario.

    Vectorises across swaps×payments using masked arrays so the inner loop is
    only over time steps (60 iterations), not swaps (1000).
    """
    n_steps = len(time_grid) - 1
    n_swaps, max_pay = pay_times.shape
    cp_exposure = np.zeros((n_steps, n_counterparties))

    sigma2_4a = (sigma ** 2) / (4.0 * a) if a > 1e-10 else 0.0

    for step in range(n_steps):
        t = time_grid[step + 1]
        r_t = scenario_rates[step + 1]

        # Precomputed curve values at t
        lp0t = log_P0_grid[step]  # log P(0,t)
        f0t = zr0_grid[step]  # instantaneous forward ~ zero rate at t

        exp_2at = np.exp(-2.0 * a * t) if a > 1e-10 else 1.0

        # Mask: which (swap, payment) pairs are still in the future?
        future = pay_times > t  # (n_swaps, max_pay) bool

        # Any swap with no future payments is dead
        alive = np.any(future, axis=1)  # (n_swaps,)
        if not np.any(alive):
            continue

        # For alive swaps, compute HW ZCB prices P(t, T_k) for all payment slots
        # tau_{i,k} = T_{i,k} - t  (zero for non-future slots)
        tau = np.where(future, pay_times - t, 0.0)
        B = np.where(future, (1.0 - np.exp(-a * tau)) / a if a > 1e-10 else tau, 0.0)
        # log A_{i,k} = log P(0,T_k) - log P(0,t) + B*f(0,t) - σ²/4a * B² * (1-e^{-2at})
        log_A = np.where(future, log_P0 - lp0t + B * f0t - sigma2_4a * B ** 2 * (1.0 - exp_2at), 0.0)
        dfs = np.where(future, np.exp(log_A - B * r_t), 0.0)

        # Forward rates: F_k = (DF_{k-1}/DF_k - 1) / tau_k
        # DF_{k-1} for the first future payment of each swap is 1.0 (discounting from t)
        df_prev = np.zeros_like(dfs)
        # Shift dfs right within each swap, inserting 1.0 at the first future slot
        for i in range(max_pay):
            col_is_first_future = future[:, i] & (np.sum(future[:, :i], axis=1) == 0)
            df_prev[:, i] = np.where(col_is_first_future, 1.0, dfs[:, max(i - 1, 0)] if i > 0 else 0.0)
        # Fix: for non-first-future slots that are still future, df_prev = previous dfs
        for i in range(1, max_pay):
            non_first = future[:, i] & ~(future[:, i] & (np.sum(future[:, :i], axis=1) == 0))
            df_prev[:, i] = np.where(non_first, dfs[:, i - 1], df_prev[:, i])

        safe_dfs = np.maximum(dfs, 1e-15)
        safe_tau = np.maximum(day_fracs, 1e-10)
        float_rates = np.where(future, (df_prev / safe_dfs - 1.0) / safe_tau, 0.0)

        # Fixed leg PV per swap: sum_k (r_fixed * tau_k * DF_k) * notional
        fixed_pv = np.sum(fixed_rates[:, None] * day_fracs * dfs * future, axis=1) * notionals

        # Float leg PV per swap: sum_k (F_k * tau_k * DF_k) * notional
        float_pv = np.sum(float_rates * day_fracs * dfs * future, axis=1) * notionals

        npvs = signs * (float_pv - fixed_pv)

        # Positive exposure per counterparty
        pos = np.maximum(npvs, 0.0) * alive
        np.add.at(cp_exposure[step], cp_ids, pos)

    return cp_exposure


def _price_scenario_batch(
    scenario_batch: List[np.ndarray],
    time_grid: np.ndarray,
    pay_times: np.ndarray,
    day_fracs: np.ndarray,
    fixed_rates: np.ndarray,
    notionals: np.ndarray,
    signs: np.ndarray,
    cp_ids: np.ndarray,
    n_pay: np.ndarray,
    n_counterparties: int,
    a: float,
    sigma: float,
    log_P0: np.ndarray,
    zr0: np.ndarray,
    log_P0_grid: np.ndarray,
    zr0_grid: np.ndarray,
) -> List[np.ndarray]:
    return [
        price_portfolio_single_scenario(
            sc, time_grid,
            pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay,
            n_counterparties, a, sigma,
            log_P0, zr0, log_P0_grid, zr0_grid,
        )
        for sc in scenario_batch
    ]


def price_swap_portfolio(
    swaps: List[Swap],
    scenarios: RateScenarios,
    n_counterparties: int,
    hw: HullWhiteParams,
    curve: DiscountCurve,
) -> np.ndarray:
    rate_scenarios, time_grid = scenarios.rates, scenarios.time_grid
    n_scenarios = rate_scenarios.shape[0]
    n_steps = len(time_grid) - 1

    # Precompute swap arrays
    pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay, max_pay = _precompute_swap_flat(swaps)

    # Precompute curve values for all payment times
    pay_flat = pay_times.copy()
    pay_flat[pay_flat <= 0] = 0.001
    log_P0 = curve._interp(pay_flat)  # log discount factors
    zr0 = -log_P0 / pay_flat

    # Precompute curve values at each time grid point
    grid_pts = time_grid[1:]
    safe_grid = np.maximum(grid_pts, 0.001)
    log_P0_grid = curve._interp(safe_grid)
    zr0_grid = -log_P0_grid / safe_grid

    a, sigma = hw.mean_reversion, hw.volatility

    # Split scenarios into a list for parfun distribution
    scenario_list = [rate_scenarios[s] for s in range(n_scenarios)]

    results = _price_scenario_batch(
        scenario_list, time_grid,
        pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay,
        n_counterparties, a, sigma,
        log_P0, zr0, log_P0_grid, zr0_grid,
    )

    return np.array(results)


# ─── Exposure profiles ───────────────────────────────────────────────────────


def compute_exposure_profiles(
    exposures: np.ndarray,
    scenarios: RateScenarios,
) -> ExposureProfile:
    time_grid = scenarios.time_grid
    # exposures: (n_scenarios, n_steps, n_counterparties)
    ee = np.mean(exposures, axis=0)  # (n_steps, n_counterparties)
    pfe = np.percentile(exposures, 97.5, axis=0)

    # Time-weighted EPE per counterparty
    dt = np.diff(time_grid)
    epe = np.zeros(ee.shape[1])
    T = time_grid[-1]
    for cp in range(ee.shape[1]):
        epe[cp] = np.sum(ee[:, cp] * dt) / T

    return ExposureProfile(
        time_grid=time_grid[1:],
        expected_exposure=ee,
        potential_future_exposure=pfe,
        expected_positive_exposure=epe,
    )


# ─── CVA computation ─────────────────────────────────────────────────────────


def compute_cva(
    profiles: ExposureProfile,
    survival_probs: np.ndarray,
    lgd: np.ndarray,
    curve: DiscountCurve,
) -> CVAResult:
    time_grid = profiles.time_grid
    ee = profiles.expected_exposure  # (n_steps, n_counterparties)
    n_steps, n_cp = ee.shape

    cp_cva = np.zeros(n_cp)
    dfs = curve.df(time_grid)

    for cp in range(n_cp):
        for k in range(n_steps):
            t = time_grid[k]
            # Marginal default probability: S(t_{k-1}) - S(t_k)
            surv_prev = survival_probs[cp, k] if k == 0 else survival_probs[cp, k]
            surv_curr = survival_probs[cp, k + 1] if k + 1 < survival_probs.shape[1] else survival_probs[cp, -1]

            # Use discrete approximation
            if k == 0:
                delta_pd = 1.0 - survival_probs[cp, 1] if survival_probs.shape[1] > 1 else 0.0
            else:
                idx_prev = min(k, survival_probs.shape[1] - 1)
                idx_curr = min(k + 1, survival_probs.shape[1] - 1)
                delta_pd = max(survival_probs[cp, idx_prev] - survival_probs[cp, idx_curr], 0.0)

            cp_cva[cp] += lgd[cp] * dfs[k] * ee[k, cp] * delta_pd

    total_cva = float(np.sum(cp_cva))
    total_npv = float(np.sum(profiles.expected_positive_exposure))

    return CVAResult(
        counterparty_cva=cp_cva,
        total_cva=total_cva,
        total_portfolio_npv=total_npv,
        n_counterparties=n_cp,
        n_scenarios=0,
        n_swaps=0,
    )


# ─── Main pipeline ───────────────────────────────────────────────────────────


def cva_pipeline(
    swap_rates: np.ndarray,
    tenors: np.ndarray,
    swaption_vols: np.ndarray,
    swaption_expiries: np.ndarray,
    swaption_tenors: np.ndarray,
    cds_spreads: np.ndarray,
    cds_tenors: np.ndarray,
    swaps: list,
    n_counterparties: int,
    n_scenarios: int,
    n_steps: int,
    horizon: float = 5.0,
    seed: int = 12345,
    lgd_seed: int = 99,
    lgd_n_simulations: int = 5000,
) -> CVAResult:
    curve = bootstrap_yield_curve(swap_rates, tenors)
    hw = calibrate_hull_white(curve, swaption_vols, swaption_expiries, swaption_tenors)
    scenarios = generate_rate_scenarios(hw, curve, n_scenarios, n_steps, horizon, seed)

    credit_curves = calibrate_credit_curves(cds_spreads, cds_tenors, curve)
    surv_probs = compute_survival_probs(credit_curves, scenarios)
    lgd = estimate_lgd(cds_spreads, credit_curves, lgd_n_simulations, lgd_seed)

    exposures = price_swap_portfolio(swaps, scenarios, n_counterparties, hw, curve)
    profiles = compute_exposure_profiles(exposures, scenarios)
    return compute_cva(profiles, surv_probs, lgd, curve)


if __name__ == "__main__":
    n_counterparties = 50
    n_swaps_per_cp = 20
    n_scenarios = 5000
    n_steps = 60
    market_seed = 42

    swap_rates, tenors, swaption_vols, swaption_expiries, swaption_tenors, cds_spreads, cds_tenors, swaps = (
        generate_market_data(n_counterparties, n_swaps_per_cp, market_seed)
    )

    pipeline_kwargs = dict(
        swap_rates=swap_rates, tenors=tenors,
        swaption_vols=swaption_vols, swaption_expiries=swaption_expiries, swaption_tenors=swaption_tenors,
        cds_spreads=cds_spreads, cds_tenors=cds_tenors,
        swaps=swaps, n_counterparties=n_counterparties,
        n_scenarios=n_scenarios, n_steps=n_steps,
    )

    result = cva_pipeline(**pipeline_kwargs)
    result.n_scenarios = n_scenarios
    result.n_swaps = len(swaps)

    print(f"Portfolio CVA: ${result.total_cva:,.2f}")
    print(f"Number of counterparties: {result.n_counterparties}")
    print(f"Number of swaps: {result.n_swaps}")
    print(f"Number of scenarios: {result.n_scenarios}")
    print(f"Top 5 counterparty CVAs: {np.sort(result.counterparty_cva)[-5:][::-1]}")

Pargraph + Parfun Implementation

[ ]:
import os

os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["GOTO_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"


from scaler import Client
from pargraph import GraphEngine, delayed, graph
import parfun as pf
scheduler_address = "tcp://172.31.3.107:7788"
object_storage_address = "tcp://172.31.3.107:7789"
from dataclasses import dataclass, field
from typing import Dict, List, Tuple

import numpy as np
import scipy.interpolate
import scipy.optimize

# ─── Domain types ─────────────────────────────────────────────────────────────


@dataclass
class DiscountCurve:
    tenors: np.ndarray
    discount_factors: np.ndarray
    _interp: object = field(repr=False, default=None)

    def __post_init__(self):
        log_df = np.log(self.discount_factors)
        self._interp = scipy.interpolate.CubicSpline(self.tenors, log_df, bc_type="natural")

    def df(self, t: np.ndarray) -> np.ndarray:
        return np.exp(self._interp(t))

    def zero_rate(self, t: np.ndarray) -> np.ndarray:
        t = np.asarray(t, dtype=float)
        safe_t = np.where(t < 1e-10, 1e-10, t)
        return -np.log(self.df(safe_t)) / safe_t


@dataclass
class HullWhiteParams:
    mean_reversion: float  # a
    volatility: float  # sigma
    theta_times: np.ndarray  # time grid for theta(t)
    theta_values: np.ndarray  # calibrated theta(t)


@dataclass
class RateScenarios:
    rates: np.ndarray  # (n_scenarios, n_steps+1)
    time_grid: np.ndarray  # (n_steps+1,)


@dataclass
class CreditCurve:
    counterparty_id: int
    tenors: np.ndarray
    hazard_rates: np.ndarray  # piecewise-constant hazard rates

    def survival_prob(self, t: np.ndarray) -> np.ndarray:
        t = np.asarray(t, dtype=float)
        result = np.ones_like(t)
        for i in range(len(self.tenors)):
            t_start = 0.0 if i == 0 else self.tenors[i - 1]
            t_end = self.tenors[i]
            lam = self.hazard_rates[i]
            mask = t > t_start
            dt = np.minimum(t, t_end) - t_start
            dt = np.maximum(dt, 0.0)
            result[mask] *= np.exp(-lam * dt[mask])
        return result


@dataclass
class Swap:
    counterparty_id: int
    notional: float
    fixed_rate: float
    payment_times: np.ndarray  # payment dates in years
    day_fractions: np.ndarray  # year fractions per period
    is_payer: bool  # True = we pay fixed, receive float


@dataclass
class ExposureProfile:
    time_grid: np.ndarray
    expected_exposure: np.ndarray  # (n_time_steps, n_counterparties)
    potential_future_exposure: np.ndarray  # 97.5th percentile
    expected_positive_exposure: np.ndarray  # (n_counterparties,) time-averaged


@dataclass
class CVAResult:
    counterparty_cva: np.ndarray  # CVA per counterparty
    total_cva: float
    total_portfolio_npv: float
    n_counterparties: int
    n_scenarios: int
    n_swaps: int


# ─── Market data generation ──────────────────────────────────────────────────


def generate_market_data(
    n_counterparties: int,
    n_swaps_per_cp: int,
    seed: int,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, List[Swap]]:
    rng = np.random.default_rng(seed)

    # Yield curve: par swap rates at standard tenors
    tenors = np.array([0.25, 0.5, 1, 2, 3, 5, 7, 10, 15, 20, 30])
    base_rates = np.array([0.042, 0.041, 0.040, 0.038, 0.037, 0.036, 0.037, 0.038, 0.040, 0.041, 0.042])
    swap_rates = base_rates + rng.normal(0, 0.001, len(tenors))

    # Swaption implied vols: (expiry, tenor) grid
    swaption_expiries = np.array([0.5, 1, 2, 5])
    swaption_tenors = np.array([2, 5, 10])
    swaption_vols = np.full((len(swaption_expiries), len(swaption_tenors)), 0.0060)
    for i, exp in enumerate(swaption_expiries):
        for j, ten in enumerate(swaption_tenors):
            swaption_vols[i, j] = 0.004 + 0.003 * np.exp(-0.3 * exp) + 0.001 * np.exp(-0.1 * ten)
            swaption_vols[i, j] += rng.normal(0, 0.0002)

    # CDS spreads per counterparty (bps) at tenors [1, 3, 5, 7, 10] years
    cds_tenors = np.array([1.0, 3.0, 5.0, 7.0, 10.0])
    cds_spreads = np.zeros((n_counterparties, len(cds_tenors)))
    for cp in range(n_counterparties):
        base_spread = rng.uniform(30, 400)  # bps
        term_slope = rng.uniform(-5, 15)
        for j, t in enumerate(cds_tenors):
            cds_spreads[cp, j] = max(5, base_spread + term_slope * t + rng.normal(0, 10))
        cds_spreads[cp] /= 10000  # convert to decimal

    # Generate swap portfolio
    swaps = []
    for cp in range(n_counterparties):
        for _ in range(n_swaps_per_cp):
            maturity = rng.choice([2, 3, 5, 7, 10, 15, 20])
            freq = 0.5  # semi-annual
            n_payments = int(maturity / freq)
            payment_times = np.arange(1, n_payments + 1) * freq
            day_fractions = np.full(n_payments, freq)
            fixed_rate = 0.035 + rng.normal(0, 0.005)
            notional = rng.choice([1e6, 5e6, 10e6, 25e6, 50e6])
            swaps.append(
                Swap(
                    counterparty_id=cp,
                    notional=notional,
                    fixed_rate=fixed_rate,
                    payment_times=payment_times,
                    day_fractions=day_fractions,
                    is_payer=bool(rng.choice([True, False])),
                )
            )

    return swap_rates, tenors, swaption_vols, swaption_expiries, swaption_tenors, cds_spreads, cds_tenors, swaps


# ─── Yield curve bootstrapping ───────────────────────────────────────────────


@delayed
def bootstrap_yield_curve(swap_rates: np.ndarray, tenors: np.ndarray) -> DiscountCurve:
    n = len(tenors)
    discount_factors = np.zeros(n)

    # Prepend t=0, DF=1
    all_tenors = np.concatenate([[0.0], tenors])
    all_dfs = np.concatenate([[1.0], np.zeros(n)])

    for i in range(n):
        rate = swap_rates[i]
        tenor = tenors[i]
        # For short tenors (< 1Y): zero-coupon style
        if tenor <= 1.0:
            all_dfs[i + 1] = 1.0 / (1.0 + rate * tenor)
        else:
            # Par swap bootstrap: rate * sum(DF_j * tau_j) + DF_n = 1
            # Approximate with annual coupons
            pv_coupons = 0.0
            for j in range(i):
                if tenors[j] >= 1.0 or j == 0:
                    dt = tenors[j] if j == 0 else tenors[j] - tenors[j - 1]
                    pv_coupons += all_dfs[j + 1] * rate * dt
            dt_last = tenor - (tenors[i - 1] if i > 0 else 0)
            all_dfs[i + 1] = (1.0 - pv_coupons) / (1.0 + rate * dt_last)

        all_dfs[i + 1] = np.clip(all_dfs[i + 1], 0.01, 1.0)
        discount_factors[i] = all_dfs[i + 1]

    return DiscountCurve(tenors=all_tenors, discount_factors=np.concatenate([[1.0], discount_factors]))


# ─── Hull-White calibration ──────────────────────────────────────────────────


def _hw_zcb_price(t: float, T: float, r_t: float, a: float, sigma: float, curve: DiscountCurve) -> float:
    if T <= t:
        return 1.0
    B = (1.0 - np.exp(-a * (T - t))) / a if a > 1e-10 else (T - t)
    P_0_T = float(curve.df(np.array([T]))[0])
    P_0_t = float(curve.df(np.array([t]))[0])
    if P_0_t < 1e-15:
        return P_0_T
    f_0_t = float(curve.zero_rate(np.array([t]))[0])
    log_A = np.log(P_0_T / P_0_t) + B * f_0_t - (sigma**2 / (4.0 * a)) * B**2 * (1.0 - np.exp(-2.0 * a * t))
    return np.exp(log_A - B * r_t)


def _hw_normal_vol(a: float, sigma: float, expiry: float, swap_tenor: float) -> float:
    """Approximate HW-implied normal swaption volatility (fast closed-form)."""
    if a < 1e-10:
        return sigma * np.sqrt(expiry) * swap_tenor
    B_swap = (1.0 - np.exp(-a * swap_tenor)) / (a * swap_tenor)
    var_factor = (1.0 - np.exp(-2.0 * a * expiry)) / (2.0 * a * expiry)
    return sigma * np.sqrt(expiry * var_factor) * B_swap


@delayed
def calibrate_hull_white(
    curve: DiscountCurve,
    swaption_vols: np.ndarray,
    swaption_expiries: np.ndarray,
    swaption_tenors: np.ndarray,
) -> HullWhiteParams:
    n_exp, n_ten = swaption_vols.shape

    def objective(params):
        a, sigma = params
        if a <= 1e-6 or sigma <= 1e-6 or a > 1.0 or sigma > 0.10:
            return 1e10
        total_err = 0.0
        for i in range(n_exp):
            for j in range(n_ten):
                model_vol = _hw_normal_vol(a, sigma, swaption_expiries[i], swaption_tenors[j])
                total_err += (model_vol - swaption_vols[i, j]) ** 2
        return total_err

    result = scipy.optimize.minimize(
        objective, x0=[0.03, 0.008], method="Nelder-Mead",
        options={"maxiter": 500, "xatol": 1e-8},
    )
    a_opt, sigma_opt = max(result.x[0], 0.001), max(result.x[1], 0.001)

    # Compute theta(t) to fit initial term structure
    dt = 0.25
    t_grid = np.arange(0, 31, dt)
    f_rates = curve.zero_rate(np.maximum(t_grid, 0.01))
    df_dt = np.gradient(f_rates * t_grid, dt)
    theta = df_dt + a_opt * f_rates + (sigma_opt**2 / (2.0 * a_opt)) * (1.0 - np.exp(-2.0 * a_opt * t_grid))

    return HullWhiteParams(mean_reversion=a_opt, volatility=sigma_opt, theta_times=t_grid, theta_values=theta)


# ─── Monte Carlo rate scenario generation ────────────────────────────────────


@delayed
def generate_rate_scenarios(
    hw: HullWhiteParams,
    curve: DiscountCurve,
    n_scenarios: int,
    n_steps: int,
    horizon: float = 5.0,
    seed: int = 12345,
) -> RateScenarios:
    rng = np.random.default_rng(seed)
    dt = horizon / n_steps
    time_grid = np.linspace(0, horizon, n_steps + 1)

    r0 = float(curve.zero_rate(np.array([0.01]))[0])
    rates = np.zeros((n_scenarios, n_steps + 1))
    rates[:, 0] = r0

    theta_interp = scipy.interpolate.interp1d(hw.theta_times, hw.theta_values, fill_value="extrapolate")
    a = hw.mean_reversion
    sigma = hw.volatility

    Z = rng.standard_normal((n_scenarios, n_steps))
    for step in range(n_steps):
        t = time_grid[step]
        theta_t = float(theta_interp(t))
        r = rates[:, step]
        drift = (theta_t - a * r) * dt
        diffusion = sigma * np.sqrt(dt) * Z[:, step]
        rates[:, step + 1] = r + drift + diffusion

    return RateScenarios(rates=rates, time_grid=time_grid)


# ─── Credit curve calibration ────────────────────────────────────────────────


def calibrate_single_credit_curve(
    cp_id: int,
    spreads: np.ndarray,
    cds_tenors: np.ndarray,
    curve: DiscountCurve,
) -> CreditCurve:
    n_tenors = len(cds_tenors)
    hazard_rates = np.zeros(n_tenors)
    dt_fine = 0.25

    for i in range(n_tenors):
        T = cds_tenors[i]
        spread = spreads[i]
        t_grid = np.arange(dt_fine, T + dt_fine / 2, dt_fine)
        n_points = len(t_grid)

        # Bootstrap: solve for hazard rate in this tenor bucket
        def cds_error(lam_i):
            lam_trial = hazard_rates.copy()
            lam_trial[i] = lam_i
            premium_leg = 0.0
            protection_leg = 0.0
            for k in range(n_points):
                t_k = t_grid[k]
                # Survival probability using all hazard rates up to t_k
                surv = 1.0
                for m in range(n_tenors):
                    t_start = 0.0 if m == 0 else cds_tenors[m - 1]
                    t_end = cds_tenors[m]
                    if t_k > t_start:
                        dt_eff = min(t_k, t_end) - t_start
                        surv *= np.exp(-lam_trial[m] * max(dt_eff, 0))
                d = float(curve.df(np.array([t_k]))[0])
                premium_leg += spread * dt_fine * d * surv
                # Protection: default probability in this interval
                surv_prev = 1.0
                t_prev = t_k - dt_fine
                for m in range(n_tenors):
                    t_start = 0.0 if m == 0 else cds_tenors[m - 1]
                    t_end = cds_tenors[m]
                    if t_prev > t_start:
                        dt_eff = min(t_prev, t_end) - t_start
                        surv_prev *= np.exp(-lam_trial[m] * max(dt_eff, 0))
                protection_leg += d * (surv_prev - surv)
            return premium_leg - protection_leg

        try:
            hazard_rates[i] = scipy.optimize.brentq(cds_error, 1e-6, 2.0, maxiter=200)
        except (ValueError, RuntimeError):
            hazard_rates[i] = spreads[i] / 0.6  # approximate: spread ~ lambda * LGD

    return CreditCurve(counterparty_id=cp_id, tenors=cds_tenors, hazard_rates=hazard_rates)


@delayed
def calibrate_credit_curves(
    cds_spreads: np.ndarray,
    cds_tenors: np.ndarray,
    curve: DiscountCurve,
) -> List[CreditCurve]:
    n_counterparties = cds_spreads.shape[0]
    return [calibrate_single_credit_curve(cp, cds_spreads[cp], cds_tenors, curve) for cp in range(n_counterparties)]


# ─── Survival probabilities & LGD ───────────────────────────────────────────


@delayed
def compute_survival_probs(
    credit_curves: List[CreditCurve],
    scenarios: RateScenarios,
) -> np.ndarray:
    time_grid = scenarios.time_grid
    n_cp = len(credit_curves)
    n_t = len(time_grid)
    surv = np.zeros((n_cp, n_t))
    for i, cc in enumerate(credit_curves):
        surv[i] = cc.survival_prob(time_grid)
    return surv


@delayed
def estimate_lgd(
    cds_spreads: np.ndarray,
    credit_curves: List[CreditCurve],
    n_simulations: int = 5000,
    seed: int = 99,
) -> np.ndarray:
    rng = np.random.default_rng(seed)
    n_cp = len(credit_curves)
    lgd_estimates = np.zeros(n_cp)

    for i in range(n_cp):
        avg_spread = np.mean(cds_spreads[i])
        avg_hazard = np.mean(credit_curves[i].hazard_rates)
        if avg_hazard > 1e-10:
            point_lgd = np.clip(avg_spread / avg_hazard, 0.10, 0.95)
        else:
            point_lgd = 0.40

        # Bayesian estimation with Beta prior
        alpha = point_lgd * 20
        beta_param = (1 - point_lgd) * 20
        samples = rng.beta(max(alpha, 0.5), max(beta_param, 0.5), n_simulations)
        lgd_estimates[i] = np.mean(samples)

    return lgd_estimates


# ─── Swap portfolio pricing under scenarios ──────────────────────────────────


def _precompute_swap_flat(swaps: List[Swap]):
    """Flatten all swap cash flows into contiguous arrays for fully vectorised pricing."""
    n_swaps = len(swaps)
    # Collect per-swap metadata
    fixed_rates = np.array([s.fixed_rate for s in swaps])
    notionals = np.array([s.notional for s in swaps])
    signs = np.array([1.0 if s.is_payer else -1.0 for s in swaps])
    cp_ids = np.array([s.counterparty_id for s in swaps], dtype=int)
    max_payments = max(len(s.payment_times) for s in swaps)

    # Padded arrays (n_swaps, max_payments) — padded slots have pay_time = 0
    pay_times = np.zeros((n_swaps, max_payments))
    day_fracs = np.zeros((n_swaps, max_payments))
    n_payments = np.array([len(s.payment_times) for s in swaps], dtype=int)
    for i, s in enumerate(swaps):
        n = n_payments[i]
        pay_times[i, :n] = s.payment_times
        day_fracs[i, :n] = s.day_fractions

    return pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_payments, max_payments


def price_portfolio_single_scenario(
    scenario_rates: np.ndarray,
    time_grid: np.ndarray,
    pay_times: np.ndarray,
    day_fracs: np.ndarray,
    fixed_rates: np.ndarray,
    notionals: np.ndarray,
    signs: np.ndarray,
    cp_ids: np.ndarray,
    n_payments: np.ndarray,
    n_counterparties: int,
    a: float,
    sigma: float,
    log_P0: np.ndarray,
    zr0: np.ndarray,
    log_P0_grid: np.ndarray,
    zr0_grid: np.ndarray,
) -> np.ndarray:
    """Price all swaps at all time steps for one MC scenario.

    Vectorises across swaps×payments using masked arrays so the inner loop is
    only over time steps (60 iterations), not swaps (1000).
    """
    n_steps = len(time_grid) - 1
    n_swaps, max_pay = pay_times.shape
    cp_exposure = np.zeros((n_steps, n_counterparties))

    sigma2_4a = (sigma ** 2) / (4.0 * a) if a > 1e-10 else 0.0

    for step in range(n_steps):
        t = time_grid[step + 1]
        r_t = scenario_rates[step + 1]

        # Precomputed curve values at t
        lp0t = log_P0_grid[step]  # log P(0,t)
        f0t = zr0_grid[step]  # instantaneous forward ~ zero rate at t

        exp_2at = np.exp(-2.0 * a * t) if a > 1e-10 else 1.0

        # Mask: which (swap, payment) pairs are still in the future?
        future = pay_times > t  # (n_swaps, max_pay) bool

        # Any swap with no future payments is dead
        alive = np.any(future, axis=1)  # (n_swaps,)
        if not np.any(alive):
            continue

        # For alive swaps, compute HW ZCB prices P(t, T_k) for all payment slots
        # tau_{i,k} = T_{i,k} - t  (zero for non-future slots)
        tau = np.where(future, pay_times - t, 0.0)
        B = np.where(future, (1.0 - np.exp(-a * tau)) / a if a > 1e-10 else tau, 0.0)
        # log A_{i,k} = log P(0,T_k) - log P(0,t) + B*f(0,t) - σ²/4a * B² * (1-e^{-2at})
        log_A = np.where(future, log_P0 - lp0t + B * f0t - sigma2_4a * B ** 2 * (1.0 - exp_2at), 0.0)
        dfs = np.where(future, np.exp(log_A - B * r_t), 0.0)

        # Forward rates: F_k = (DF_{k-1}/DF_k - 1) / tau_k
        # DF_{k-1} for the first future payment of each swap is 1.0 (discounting from t)
        df_prev = np.zeros_like(dfs)
        # Shift dfs right within each swap, inserting 1.0 at the first future slot
        for i in range(max_pay):
            col_is_first_future = future[:, i] & (np.sum(future[:, :i], axis=1) == 0)
            df_prev[:, i] = np.where(col_is_first_future, 1.0, dfs[:, max(i - 1, 0)] if i > 0 else 0.0)
        # Fix: for non-first-future slots that are still future, df_prev = previous dfs
        for i in range(1, max_pay):
            non_first = future[:, i] & ~(future[:, i] & (np.sum(future[:, :i], axis=1) == 0))
            df_prev[:, i] = np.where(non_first, dfs[:, i - 1], df_prev[:, i])

        safe_dfs = np.maximum(dfs, 1e-15)
        safe_tau = np.maximum(day_fracs, 1e-10)
        float_rates = np.where(future, (df_prev / safe_dfs - 1.0) / safe_tau, 0.0)

        # Fixed leg PV per swap: sum_k (r_fixed * tau_k * DF_k) * notional
        fixed_pv = np.sum(fixed_rates[:, None] * day_fracs * dfs * future, axis=1) * notionals

        # Float leg PV per swap: sum_k (F_k * tau_k * DF_k) * notional
        float_pv = np.sum(float_rates * day_fracs * dfs * future, axis=1) * notionals

        npvs = signs * (float_pv - fixed_pv)

        # Positive exposure per counterparty
        pos = np.maximum(npvs, 0.0) * alive
        np.add.at(cp_exposure[step], cp_ids, pos)

    return cp_exposure


@pf.parallel(
    split=pf.per_argument(scenario_batch=pf.py_list.by_chunk),
    combine_with=pf.py_list.concat,
    fixed_partition_size=10,
)
def _price_scenario_batch(
    scenario_batch: List[np.ndarray],
    time_grid: np.ndarray,
    pay_times: np.ndarray,
    day_fracs: np.ndarray,
    fixed_rates: np.ndarray,
    notionals: np.ndarray,
    signs: np.ndarray,
    cp_ids: np.ndarray,
    n_pay: np.ndarray,
    n_counterparties: int,
    a: float,
    sigma: float,
    log_P0: np.ndarray,
    zr0: np.ndarray,
    log_P0_grid: np.ndarray,
    zr0_grid: np.ndarray,
) -> List[np.ndarray]:
    return [
        price_portfolio_single_scenario(
            sc, time_grid,
            pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay,
            n_counterparties, a, sigma,
            log_P0, zr0, log_P0_grid, zr0_grid,
        )
        for sc in scenario_batch
    ]


@delayed
def price_swap_portfolio(
    swaps: List[Swap],
    scenarios: RateScenarios,
    n_counterparties: int,
    hw: HullWhiteParams,
    curve: DiscountCurve,
) -> np.ndarray:
    rate_scenarios, time_grid = scenarios.rates, scenarios.time_grid
    n_scenarios = rate_scenarios.shape[0]
    n_steps = len(time_grid) - 1

    # Precompute swap arrays
    pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay, max_pay = _precompute_swap_flat(swaps)

    # Precompute curve values for all payment times
    pay_flat = pay_times.copy()
    pay_flat[pay_flat <= 0] = 0.001
    log_P0 = curve._interp(pay_flat)  # log discount factors
    zr0 = -log_P0 / pay_flat

    # Precompute curve values at each time grid point
    grid_pts = time_grid[1:]
    safe_grid = np.maximum(grid_pts, 0.001)
    log_P0_grid = curve._interp(safe_grid)
    zr0_grid = -log_P0_grid / safe_grid

    a, sigma = hw.mean_reversion, hw.volatility

    # Split scenarios into a list for parfun distribution
    scenario_list = [rate_scenarios[s] for s in range(n_scenarios)]

    with pf.set_parallel_backend_context(
        "scaler_remote",
        scheduler_address=scheduler_address,
        object_storage_address=object_storage_address,
        n_workers=128,
    ):
        results = _price_scenario_batch(
            scenario_list, time_grid,
            pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay,
            n_counterparties, a, sigma,
            log_P0, zr0, log_P0_grid, zr0_grid,
        )

    return np.array(results)


# ─── Exposure profiles ───────────────────────────────────────────────────────


@delayed
def compute_exposure_profiles(
    exposures: np.ndarray,
    scenarios: RateScenarios,
) -> ExposureProfile:
    time_grid = scenarios.time_grid
    # exposures: (n_scenarios, n_steps, n_counterparties)
    ee = np.mean(exposures, axis=0)  # (n_steps, n_counterparties)
    pfe = np.percentile(exposures, 97.5, axis=0)

    # Time-weighted EPE per counterparty
    dt = np.diff(time_grid)
    epe = np.zeros(ee.shape[1])
    T = time_grid[-1]
    for cp in range(ee.shape[1]):
        epe[cp] = np.sum(ee[:, cp] * dt) / T

    return ExposureProfile(
        time_grid=time_grid[1:],
        expected_exposure=ee,
        potential_future_exposure=pfe,
        expected_positive_exposure=epe,
    )


# ─── CVA computation ─────────────────────────────────────────────────────────


@delayed
def compute_cva(
    profiles: ExposureProfile,
    survival_probs: np.ndarray,
    lgd: np.ndarray,
    curve: DiscountCurve,
) -> CVAResult:
    time_grid = profiles.time_grid
    ee = profiles.expected_exposure  # (n_steps, n_counterparties)
    n_steps, n_cp = ee.shape

    cp_cva = np.zeros(n_cp)
    dfs = curve.df(time_grid)

    for cp in range(n_cp):
        for k in range(n_steps):
            t = time_grid[k]
            # Marginal default probability: S(t_{k-1}) - S(t_k)
            surv_prev = survival_probs[cp, k] if k == 0 else survival_probs[cp, k]
            surv_curr = survival_probs[cp, k + 1] if k + 1 < survival_probs.shape[1] else survival_probs[cp, -1]

            # Use discrete approximation
            if k == 0:
                delta_pd = 1.0 - survival_probs[cp, 1] if survival_probs.shape[1] > 1 else 0.0
            else:
                idx_prev = min(k, survival_probs.shape[1] - 1)
                idx_curr = min(k + 1, survival_probs.shape[1] - 1)
                delta_pd = max(survival_probs[cp, idx_prev] - survival_probs[cp, idx_curr], 0.0)

            cp_cva[cp] += lgd[cp] * dfs[k] * ee[k, cp] * delta_pd

    total_cva = float(np.sum(cp_cva))
    total_npv = float(np.sum(profiles.expected_positive_exposure))

    return CVAResult(
        counterparty_cva=cp_cva,
        total_cva=total_cva,
        total_portfolio_npv=total_npv,
        n_counterparties=n_cp,
        n_scenarios=0,
        n_swaps=0,
    )


# ─── Main pipeline ───────────────────────────────────────────────────────────


@graph
def cva_pipeline(
    swap_rates: np.ndarray,
    tenors: np.ndarray,
    swaption_vols: np.ndarray,
    swaption_expiries: np.ndarray,
    swaption_tenors: np.ndarray,
    cds_spreads: np.ndarray,
    cds_tenors: np.ndarray,
    swaps: list,
    n_counterparties: int,
    n_scenarios: int,
    n_steps: int,
    horizon: float = 5.0,
    seed: int = 12345,
    lgd_seed: int = 99,
    lgd_n_simulations: int = 5000,
) -> CVAResult:
    curve = bootstrap_yield_curve(swap_rates, tenors)
    hw = calibrate_hull_white(curve, swaption_vols, swaption_expiries, swaption_tenors)
    scenarios = generate_rate_scenarios(hw, curve, n_scenarios, n_steps, horizon, seed)

    credit_curves = calibrate_credit_curves(cds_spreads, cds_tenors, curve)
    surv_probs = compute_survival_probs(credit_curves, scenarios)
    lgd = estimate_lgd(cds_spreads, credit_curves, lgd_n_simulations, lgd_seed)

    exposures = price_swap_portfolio(swaps, scenarios, n_counterparties, hw, curve)
    profiles = compute_exposure_profiles(exposures, scenarios)
    return compute_cva(profiles, surv_probs, lgd, curve)


if __name__ == "__main__":
    n_counterparties = 50
    n_swaps_per_cp = 20
    n_scenarios = 5000
    n_steps = 60
    market_seed = 42

    swap_rates, tenors, swaption_vols, swaption_expiries, swaption_tenors, cds_spreads, cds_tenors, swaps = (
        generate_market_data(n_counterparties, n_swaps_per_cp, market_seed)
    )

    pipeline_kwargs = dict(
        swap_rates=swap_rates, tenors=tenors,
        swaption_vols=swaption_vols, swaption_expiries=swaption_expiries, swaption_tenors=swaption_tenors,
        cds_spreads=cds_spreads, cds_tenors=cds_tenors,
        swaps=swaps, n_counterparties=n_counterparties,
        n_scenarios=n_scenarios, n_steps=n_steps,
    )

    client = Client(scheduler_address, object_storage_address=object_storage_address)
    engine = GraphEngine(client)

    graph_obj = cva_pipeline.to_graph()
    dict_graph, keys = graph_obj.to_dict(**pipeline_kwargs)
    (result,) = engine.get(dict_graph, keys)
    result.n_scenarios = n_scenarios
    result.n_swaps = len(swaps)

    print(f"Portfolio CVA: ${result.total_cva:,.2f}")
    print(f"Number of counterparties: {result.n_counterparties}")
    print(f"Number of swaps: {result.n_swaps}")
    print(f"Number of scenarios: {result.n_scenarios}")
    print(f"Top 5 counterparty CVAs: {np.sort(result.counterparty_cva)[-5:][::-1]}")

Diff: Native vs Pargraph + Parfun

[2]:
import difflib
import json
import re

from IPython.display import HTML, display

with open("SwapCVA.ipynb") as _f:
    _nb = json.load(_f)
native_code = "".join(_nb["cells"][3]["source"])
parallel_code = "".join(_nb["cells"][5]["source"])


differ = difflib.HtmlDiff(wrapcolumn=90)
table = differ.make_table(
    native_code.splitlines(),
    parallel_code.splitlines(),
    fromdesc="Native Python",
    todesc="With Pargraph + Parfun",
    context=True,
    numlines=10,
)

# Strip unwanted columns: navigation links, line numbers, and nowrap attribute
table = re.sub(r'<td class="diff_next"[^>]*>.*?</td>', "", table)
table = re.sub(r'<th class="diff_next"[^>]*>.*?</th>', "", table)
table = re.sub(r'<td class="diff_header"[^>]*>.*?</td>', "", table)
table = table.replace('colspan="2"', "")
table = table.replace(' nowrap="nowrap"', "")
table = re.sub(
    r"(\s*<colgroup></colgroup>){6}",
    "\n        <colgroup></colgroup> <colgroup></colgroup>",
    table,
)

css = """<style>
    table.diff { font-family: monospace; font-size: 10px; border-collapse: collapse; width: 100%; }
    table.diff td { padding: 2px 8px; white-space: pre-wrap; word-break: break-all; text-align: left !important; }
    table.diff th { padding: 6px 8px; text-align: center; background-color: #f0f0f0; font-size: 14px; }
    td.diff_add { background-color: #e6ffec; }
    td.diff_chg { background-color: #fff8c5; }
    td.diff_sub { background-color: #ffebe9; }
    span.diff_add { background-color: #ccffd8; }
    span.diff_chg { background-color: #fff2a8; }
    span.diff_sub { background-color: #ffc0c0; }
</style>"""

display(HTML(css + table))

Native PythonWith Pargraph + Parfun
import osimport os
os.environ["OPENBLAS_NUM_THREADS"] = "1"os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["GOTO_NUM_THREADS"] = "1"os.environ["GOTO_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"os.environ["MKL_NUM_THREADS"] = "1"
 
from scaler import Client
from pargraph import GraphEngine, delayed, graph
import parfun as pf
scheduler_address = "tcp://172.31.3.107:7788"
object_storage_address = "tcp://172.31.3.107:7789"
from dataclasses import dataclass, fieldfrom dataclasses import dataclass, field
from typing import Dict, List, Tuplefrom typing import Dict, List, Tuple
import numpy as npimport numpy as np
import scipy.interpolateimport scipy.interpolate
import scipy.optimizeimport scipy.optimize
# ─── Domain types ─────────────────────────────────────────────────────────────# ─── Domain types ─────────────────────────────────────────────────────────────
                    is_payer=bool(rng.choice([True, False])),                    is_payer=bool(rng.choice([True, False])),
                )                )
            )            )
    return swap_rates, tenors, swaption_vols, swaption_expiries, swaption_tenors, cds_spre    return swap_rates, tenors, swaption_vols, swaption_expiries, swaption_tenors, cds_spre
ads, cds_tenors, swapsads, cds_tenors, swaps
# ─── Yield curve bootstrapping ───────────────────────────────────────────────# ─── Yield curve bootstrapping ───────────────────────────────────────────────
@delayed
def bootstrap_yield_curve(swap_rates: np.ndarray, tenors: np.ndarray) -> DiscountCurve:def bootstrap_yield_curve(swap_rates: np.ndarray, tenors: np.ndarray) -> DiscountCurve:
    n = len(tenors)    n = len(tenors)
    discount_factors = np.zeros(n)    discount_factors = np.zeros(n)
    # Prepend t=0, DF=1    # Prepend t=0, DF=1
    all_tenors = np.concatenate([[0.0], tenors])    all_tenors = np.concatenate([[0.0], tenors])
    all_dfs = np.concatenate([[1.0], np.zeros(n)])    all_dfs = np.concatenate([[1.0], np.zeros(n)])
    for i in range(n):    for i in range(n):
        rate = swap_rates[i]        rate = swap_rates[i]
def _hw_normal_vol(a: float, sigma: float, expiry: float, swap_tenor: float) -> float:def _hw_normal_vol(a: float, sigma: float, expiry: float, swap_tenor: float) -> float:
    """Approximate HW-implied normal swaption volatility (fast closed-form)."""    """Approximate HW-implied normal swaption volatility (fast closed-form)."""
    if a < 1e-10:    if a < 1e-10:
        return sigma * np.sqrt(expiry) * swap_tenor        return sigma * np.sqrt(expiry) * swap_tenor
    B_swap = (1.0 - np.exp(-a * swap_tenor)) / (a * swap_tenor)    B_swap = (1.0 - np.exp(-a * swap_tenor)) / (a * swap_tenor)
    var_factor = (1.0 - np.exp(-2.0 * a * expiry)) / (2.0 * a * expiry)    var_factor = (1.0 - np.exp(-2.0 * a * expiry)) / (2.0 * a * expiry)
    return sigma * np.sqrt(expiry * var_factor) * B_swap    return sigma * np.sqrt(expiry * var_factor) * B_swap
@delayed
def calibrate_hull_white(def calibrate_hull_white(
    curve: DiscountCurve,    curve: DiscountCurve,
    swaption_vols: np.ndarray,    swaption_vols: np.ndarray,
    swaption_expiries: np.ndarray,    swaption_expiries: np.ndarray,
    swaption_tenors: np.ndarray,    swaption_tenors: np.ndarray,
) -> HullWhiteParams:) -> HullWhiteParams:
    n_exp, n_ten = swaption_vols.shape    n_exp, n_ten = swaption_vols.shape
    def objective(params):    def objective(params):
        a, sigma = params        a, sigma = params
    f_rates = curve.zero_rate(np.maximum(t_grid, 0.01))    f_rates = curve.zero_rate(np.maximum(t_grid, 0.01))
    df_dt = np.gradient(f_rates * t_grid, dt)    df_dt = np.gradient(f_rates * t_grid, dt)
    theta = df_dt + a_opt * f_rates + (sigma_opt**2 / (2.0 * a_opt)) * (1.0 - np.exp(-2.0     theta = df_dt + a_opt * f_rates + (sigma_opt**2 / (2.0 * a_opt)) * (1.0 - np.exp(-2.0 
* a_opt * t_grid))* a_opt * t_grid))
    return HullWhiteParams(mean_reversion=a_opt, volatility=sigma_opt, theta_times=t_grid,    return HullWhiteParams(mean_reversion=a_opt, volatility=sigma_opt, theta_times=t_grid,
 theta_values=theta) theta_values=theta)
# ─── Monte Carlo rate scenario generation ────────────────────────────────────# ─── Monte Carlo rate scenario generation ────────────────────────────────────
@delayed
def generate_rate_scenarios(def generate_rate_scenarios(
    hw: HullWhiteParams,    hw: HullWhiteParams,
    curve: DiscountCurve,    curve: DiscountCurve,
    n_scenarios: int,    n_scenarios: int,
    n_steps: int,    n_steps: int,
    horizon: float = 5.0,    horizon: float = 5.0,
    seed: int = 12345,    seed: int = 12345,
) -> RateScenarios:) -> RateScenarios:
    rng = np.random.default_rng(seed)    rng = np.random.default_rng(seed)
    dt = horizon / n_steps    dt = horizon / n_steps
            return premium_leg - protection_leg            return premium_leg - protection_leg
        try:        try:
            hazard_rates[i] = scipy.optimize.brentq(cds_error, 1e-6, 2.0, maxiter=200)            hazard_rates[i] = scipy.optimize.brentq(cds_error, 1e-6, 2.0, maxiter=200)
        except (ValueError, RuntimeError):        except (ValueError, RuntimeError):
            hazard_rates[i] = spreads[i] / 0.6  # approximate: spread ~ lambda * LGD            hazard_rates[i] = spreads[i] / 0.6  # approximate: spread ~ lambda * LGD
    return CreditCurve(counterparty_id=cp_id, tenors=cds_tenors, hazard_rates=hazard_rates    return CreditCurve(counterparty_id=cp_id, tenors=cds_tenors, hazard_rates=hazard_rates
))
@delayed
def calibrate_credit_curves(def calibrate_credit_curves(
    cds_spreads: np.ndarray,    cds_spreads: np.ndarray,
    cds_tenors: np.ndarray,    cds_tenors: np.ndarray,
    curve: DiscountCurve,    curve: DiscountCurve,
) -> List[CreditCurve]:) -> List[CreditCurve]:
    n_counterparties = cds_spreads.shape[0]    n_counterparties = cds_spreads.shape[0]
    return [calibrate_single_credit_curve(cp, cds_spreads[cp], cds_tenors, curve) for cp i    return [calibrate_single_credit_curve(cp, cds_spreads[cp], cds_tenors, curve) for cp i
n range(n_counterparties)]n range(n_counterparties)]
# ─── Survival probabilities & LGD ───────────────────────────────────────────# ─── Survival probabilities & LGD ───────────────────────────────────────────
@delayed
def compute_survival_probs(def compute_survival_probs(
    credit_curves: List[CreditCurve],    credit_curves: List[CreditCurve],
    scenarios: RateScenarios,    scenarios: RateScenarios,
) -> np.ndarray:) -> np.ndarray:
    time_grid = scenarios.time_grid    time_grid = scenarios.time_grid
    n_cp = len(credit_curves)    n_cp = len(credit_curves)
    n_t = len(time_grid)    n_t = len(time_grid)
    surv = np.zeros((n_cp, n_t))    surv = np.zeros((n_cp, n_t))
    for i, cc in enumerate(credit_curves):    for i, cc in enumerate(credit_curves):
        surv[i] = cc.survival_prob(time_grid)        surv[i] = cc.survival_prob(time_grid)
    return surv    return surv
@delayed
def estimate_lgd(def estimate_lgd(
    cds_spreads: np.ndarray,    cds_spreads: np.ndarray,
    credit_curves: List[CreditCurve],    credit_curves: List[CreditCurve],
    n_simulations: int = 5000,    n_simulations: int = 5000,
    seed: int = 99,    seed: int = 99,
) -> np.ndarray:) -> np.ndarray:
    rng = np.random.default_rng(seed)    rng = np.random.default_rng(seed)
    n_cp = len(credit_curves)    n_cp = len(credit_curves)
    lgd_estimates = np.zeros(n_cp)    lgd_estimates = np.zeros(n_cp)
        npvs = signs * (float_pv - fixed_pv)        npvs = signs * (float_pv - fixed_pv)
        # Positive exposure per counterparty        # Positive exposure per counterparty
        pos = np.maximum(npvs, 0.0) * alive        pos = np.maximum(npvs, 0.0) * alive
        np.add.at(cp_exposure[step], cp_ids, pos)        np.add.at(cp_exposure[step], cp_ids, pos)
    return cp_exposure    return cp_exposure
@pf.parallel(
    split=pf.per_argument(scenario_batch=pf.py_list.by_chunk),
    combine_with=pf.py_list.concat,
    fixed_partition_size=10,
)
def _price_scenario_batch(def _price_scenario_batch(
    scenario_batch: List[np.ndarray],    scenario_batch: List[np.ndarray],
    time_grid: np.ndarray,    time_grid: np.ndarray,
    pay_times: np.ndarray,    pay_times: np.ndarray,
    day_fracs: np.ndarray,    day_fracs: np.ndarray,
    fixed_rates: np.ndarray,    fixed_rates: np.ndarray,
    notionals: np.ndarray,    notionals: np.ndarray,
    signs: np.ndarray,    signs: np.ndarray,
    cp_ids: np.ndarray,    cp_ids: np.ndarray,
    n_pay: np.ndarray,    n_pay: np.ndarray,
        price_portfolio_single_scenario(        price_portfolio_single_scenario(
            sc, time_grid,            sc, time_grid,
            pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay,            pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay,
            n_counterparties, a, sigma,            n_counterparties, a, sigma,
            log_P0, zr0, log_P0_grid, zr0_grid,            log_P0, zr0, log_P0_grid, zr0_grid,
        )        )
        for sc in scenario_batch        for sc in scenario_batch
    ]    ]
@delayed
def price_swap_portfolio(def price_swap_portfolio(
    swaps: List[Swap],    swaps: List[Swap],
    scenarios: RateScenarios,    scenarios: RateScenarios,
    n_counterparties: int,    n_counterparties: int,
    hw: HullWhiteParams,    hw: HullWhiteParams,
    curve: DiscountCurve,    curve: DiscountCurve,
) -> np.ndarray:) -> np.ndarray:
    rate_scenarios, time_grid = scenarios.rates, scenarios.time_grid    rate_scenarios, time_grid = scenarios.rates, scenarios.time_grid
    n_scenarios = rate_scenarios.shape[0]    n_scenarios = rate_scenarios.shape[0]
    n_steps = len(time_grid) - 1    n_steps = len(time_grid) - 1
    grid_pts = time_grid[1:]    grid_pts = time_grid[1:]
    safe_grid = np.maximum(grid_pts, 0.001)    safe_grid = np.maximum(grid_pts, 0.001)
    log_P0_grid = curve._interp(safe_grid)    log_P0_grid = curve._interp(safe_grid)
    zr0_grid = -log_P0_grid / safe_grid    zr0_grid = -log_P0_grid / safe_grid
    a, sigma = hw.mean_reversion, hw.volatility    a, sigma = hw.mean_reversion, hw.volatility
    # Split scenarios into a list for parfun distribution    # Split scenarios into a list for parfun distribution
    scenario_list = [rate_scenarios[s] for s in range(n_scenarios)]    scenario_list = [rate_scenarios[s] for s in range(n_scenarios)]
    results = _price_scenario_batch(    with pf.set_parallel_backend_context(
        scenario_list, time_grid,        "scaler_remote",
        pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay,        scheduler_address=scheduler_address,
        n_counterparties, a, sigma,        object_storage_address=object_storage_address,
        log_P0, zr0, log_P0_grid, zr0_grid,        n_workers=128,
    )    ):
        results = _price_scenario_batch(
            scenario_list, time_grid,
            pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay,
            n_counterparties, a, sigma,
            log_P0, zr0, log_P0_grid, zr0_grid,
        )
    return np.array(results)    return np.array(results)
# ─── Exposure profiles ───────────────────────────────────────────────────────# ─── Exposure profiles ───────────────────────────────────────────────────────
@delayed
def compute_exposure_profiles(def compute_exposure_profiles(
    exposures: np.ndarray,    exposures: np.ndarray,
    scenarios: RateScenarios,    scenarios: RateScenarios,
) -> ExposureProfile:) -> ExposureProfile:
    time_grid = scenarios.time_grid    time_grid = scenarios.time_grid
    # exposures: (n_scenarios, n_steps, n_counterparties)    # exposures: (n_scenarios, n_steps, n_counterparties)
    ee = np.mean(exposures, axis=0)  # (n_steps, n_counterparties)    ee = np.mean(exposures, axis=0)  # (n_steps, n_counterparties)
    pfe = np.percentile(exposures, 97.5, axis=0)    pfe = np.percentile(exposures, 97.5, axis=0)
    # Time-weighted EPE per counterparty    # Time-weighted EPE per counterparty
        time_grid=time_grid[1:],        time_grid=time_grid[1:],
        expected_exposure=ee,        expected_exposure=ee,
        potential_future_exposure=pfe,        potential_future_exposure=pfe,
        expected_positive_exposure=epe,        expected_positive_exposure=epe,
    )    )
# ─── CVA computation ─────────────────────────────────────────────────────────# ─── CVA computation ─────────────────────────────────────────────────────────
@delayed
def compute_cva(def compute_cva(
    profiles: ExposureProfile,    profiles: ExposureProfile,
    survival_probs: np.ndarray,    survival_probs: np.ndarray,
    lgd: np.ndarray,    lgd: np.ndarray,
    curve: DiscountCurve,    curve: DiscountCurve,
) -> CVAResult:) -> CVAResult:
    time_grid = profiles.time_grid    time_grid = profiles.time_grid
    ee = profiles.expected_exposure  # (n_steps, n_counterparties)    ee = profiles.expected_exposure  # (n_steps, n_counterparties)
    n_steps, n_cp = ee.shape    n_steps, n_cp = ee.shape
        total_portfolio_npv=total_npv,        total_portfolio_npv=total_npv,
        n_counterparties=n_cp,        n_counterparties=n_cp,
        n_scenarios=0,        n_scenarios=0,
        n_swaps=0,        n_swaps=0,
    )    )
# ─── Main pipeline ───────────────────────────────────────────────────────────# ─── Main pipeline ───────────────────────────────────────────────────────────
@graph
def cva_pipeline(def cva_pipeline(
    swap_rates: np.ndarray,    swap_rates: np.ndarray,
    tenors: np.ndarray,    tenors: np.ndarray,
    swaption_vols: np.ndarray,    swaption_vols: np.ndarray,
    swaption_expiries: np.ndarray,    swaption_expiries: np.ndarray,
    swaption_tenors: np.ndarray,    swaption_tenors: np.ndarray,
    cds_spreads: np.ndarray,    cds_spreads: np.ndarray,
    cds_tenors: np.ndarray,    cds_tenors: np.ndarray,
    swaps: list,    swaps: list,
    n_counterparties: int,    n_counterparties: int,
    )    )
    pipeline_kwargs = dict(    pipeline_kwargs = dict(
        swap_rates=swap_rates, tenors=tenors,        swap_rates=swap_rates, tenors=tenors,
        swaption_vols=swaption_vols, swaption_expiries=swaption_expiries, swaption_tenors=        swaption_vols=swaption_vols, swaption_expiries=swaption_expiries, swaption_tenors=
swaption_tenors,swaption_tenors,
        cds_spreads=cds_spreads, cds_tenors=cds_tenors,        cds_spreads=cds_spreads, cds_tenors=cds_tenors,
        swaps=swaps, n_counterparties=n_counterparties,        swaps=swaps, n_counterparties=n_counterparties,
        n_scenarios=n_scenarios, n_steps=n_steps,        n_scenarios=n_scenarios, n_steps=n_steps,
    )    )
    result = cva_pipeline(**pipeline_kwargs)    client = Client(scheduler_address, object_storage_address=object_storage_address)
    engine = GraphEngine(client)
 
    graph_obj = cva_pipeline.to_graph()
    dict_graph, keys = graph_obj.to_dict(**pipeline_kwargs)
    (result,) = engine.get(dict_graph, keys)
    result.n_scenarios = n_scenarios    result.n_scenarios = n_scenarios
    result.n_swaps = len(swaps)    result.n_swaps = len(swaps)
    print(f"Portfolio CVA: ${result.total_cva:,.2f}")    print(f"Portfolio CVA: ${result.total_cva:,.2f}")
    print(f"Number of counterparties: {result.n_counterparties}")    print(f"Number of counterparties: {result.n_counterparties}")
    print(f"Number of swaps: {result.n_swaps}")    print(f"Number of swaps: {result.n_swaps}")
    print(f"Number of scenarios: {result.n_scenarios}")    print(f"Number of scenarios: {result.n_scenarios}")
    print(f"Top 5 counterparty CVAs: {np.sort(result.counterparty_cva)[-5:][::-1]}")    print(f"Top 5 counterparty CVAs: {np.sort(result.counterparty_cva)[-5:][::-1]}")

Statistics

Parfun

Pargraph

Sequential Runtime

Parallel Runtime

Workers

Speedup

Efficiency

Yes

Yes

35m12.430s

1m16.413s

64

27.64x

0.43