Parallel Swap Portfolio CVA with Pargraph + Parfun¶
This notebook demonstrates a full Credit Valuation Adjustment (CVA) pipeline for a portfolio of interest rate swaps, parallelised with Pargraph (DAG-level task scheduling) and Parfun (data-level scenario distribution across workers):
Yield curve bootstrap — fit discount factors from par swap rates
Hull-White calibration — calibrate mean-reversion and volatility to swaption normal vols
Monte Carlo scenario generation — simulate 5000 short-rate paths under Hull-White dynamics
Credit curve calibration — bootstrap hazard rates from CDS spreads for 50 counterparties
Survival probabilities & LGD — compute default probabilities and loss-given-default
Portfolio pricing — price 1000 swaps across all scenarios (the bottleneck)
Exposure profiles — Expected Exposure, PFE, and time-averaged EPE
CVA computation — integrate discounted expected exposure against marginal default probability
Pargraph schedules the pipeline as a DAG: steps 2 and 4 run in parallel since both depend only on step 1. Parfun parallelises step 6 by distributing the 5000 scenario computations across Scaler workers with a single @pf.parallel decorator.
Program Structure¶
Native Python Implementation¶
[ ]:
import os
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["GOTO_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
from dataclasses import dataclass, field
from typing import Dict, List, Tuple
import numpy as np
import scipy.interpolate
import scipy.optimize
# ─── Domain types ─────────────────────────────────────────────────────────────
@dataclass
class DiscountCurve:
tenors: np.ndarray
discount_factors: np.ndarray
_interp: object = field(repr=False, default=None)
def __post_init__(self):
log_df = np.log(self.discount_factors)
self._interp = scipy.interpolate.CubicSpline(self.tenors, log_df, bc_type="natural")
def df(self, t: np.ndarray) -> np.ndarray:
return np.exp(self._interp(t))
def zero_rate(self, t: np.ndarray) -> np.ndarray:
t = np.asarray(t, dtype=float)
safe_t = np.where(t < 1e-10, 1e-10, t)
return -np.log(self.df(safe_t)) / safe_t
@dataclass
class HullWhiteParams:
mean_reversion: float # a
volatility: float # sigma
theta_times: np.ndarray # time grid for theta(t)
theta_values: np.ndarray # calibrated theta(t)
@dataclass
class RateScenarios:
rates: np.ndarray # (n_scenarios, n_steps+1)
time_grid: np.ndarray # (n_steps+1,)
@dataclass
class CreditCurve:
counterparty_id: int
tenors: np.ndarray
hazard_rates: np.ndarray # piecewise-constant hazard rates
def survival_prob(self, t: np.ndarray) -> np.ndarray:
t = np.asarray(t, dtype=float)
result = np.ones_like(t)
for i in range(len(self.tenors)):
t_start = 0.0 if i == 0 else self.tenors[i - 1]
t_end = self.tenors[i]
lam = self.hazard_rates[i]
mask = t > t_start
dt = np.minimum(t, t_end) - t_start
dt = np.maximum(dt, 0.0)
result[mask] *= np.exp(-lam * dt[mask])
return result
@dataclass
class Swap:
counterparty_id: int
notional: float
fixed_rate: float
payment_times: np.ndarray # payment dates in years
day_fractions: np.ndarray # year fractions per period
is_payer: bool # True = we pay fixed, receive float
@dataclass
class ExposureProfile:
time_grid: np.ndarray
expected_exposure: np.ndarray # (n_time_steps, n_counterparties)
potential_future_exposure: np.ndarray # 97.5th percentile
expected_positive_exposure: np.ndarray # (n_counterparties,) time-averaged
@dataclass
class CVAResult:
counterparty_cva: np.ndarray # CVA per counterparty
total_cva: float
total_portfolio_npv: float
n_counterparties: int
n_scenarios: int
n_swaps: int
# ─── Market data generation ──────────────────────────────────────────────────
def generate_market_data(
n_counterparties: int,
n_swaps_per_cp: int,
seed: int,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, List[Swap]]:
rng = np.random.default_rng(seed)
# Yield curve: par swap rates at standard tenors
tenors = np.array([0.25, 0.5, 1, 2, 3, 5, 7, 10, 15, 20, 30])
base_rates = np.array([0.042, 0.041, 0.040, 0.038, 0.037, 0.036, 0.037, 0.038, 0.040, 0.041, 0.042])
swap_rates = base_rates + rng.normal(0, 0.001, len(tenors))
# Swaption implied vols: (expiry, tenor) grid
swaption_expiries = np.array([0.5, 1, 2, 5])
swaption_tenors = np.array([2, 5, 10])
swaption_vols = np.full((len(swaption_expiries), len(swaption_tenors)), 0.0060)
for i, exp in enumerate(swaption_expiries):
for j, ten in enumerate(swaption_tenors):
swaption_vols[i, j] = 0.004 + 0.003 * np.exp(-0.3 * exp) + 0.001 * np.exp(-0.1 * ten)
swaption_vols[i, j] += rng.normal(0, 0.0002)
# CDS spreads per counterparty (bps) at tenors [1, 3, 5, 7, 10] years
cds_tenors = np.array([1.0, 3.0, 5.0, 7.0, 10.0])
cds_spreads = np.zeros((n_counterparties, len(cds_tenors)))
for cp in range(n_counterparties):
base_spread = rng.uniform(30, 400) # bps
term_slope = rng.uniform(-5, 15)
for j, t in enumerate(cds_tenors):
cds_spreads[cp, j] = max(5, base_spread + term_slope * t + rng.normal(0, 10))
cds_spreads[cp] /= 10000 # convert to decimal
# Generate swap portfolio
swaps = []
for cp in range(n_counterparties):
for _ in range(n_swaps_per_cp):
maturity = rng.choice([2, 3, 5, 7, 10, 15, 20])
freq = 0.5 # semi-annual
n_payments = int(maturity / freq)
payment_times = np.arange(1, n_payments + 1) * freq
day_fractions = np.full(n_payments, freq)
fixed_rate = 0.035 + rng.normal(0, 0.005)
notional = rng.choice([1e6, 5e6, 10e6, 25e6, 50e6])
swaps.append(
Swap(
counterparty_id=cp,
notional=notional,
fixed_rate=fixed_rate,
payment_times=payment_times,
day_fractions=day_fractions,
is_payer=bool(rng.choice([True, False])),
)
)
return swap_rates, tenors, swaption_vols, swaption_expiries, swaption_tenors, cds_spreads, cds_tenors, swaps
# ─── Yield curve bootstrapping ───────────────────────────────────────────────
def bootstrap_yield_curve(swap_rates: np.ndarray, tenors: np.ndarray) -> DiscountCurve:
n = len(tenors)
discount_factors = np.zeros(n)
# Prepend t=0, DF=1
all_tenors = np.concatenate([[0.0], tenors])
all_dfs = np.concatenate([[1.0], np.zeros(n)])
for i in range(n):
rate = swap_rates[i]
tenor = tenors[i]
# For short tenors (< 1Y): zero-coupon style
if tenor <= 1.0:
all_dfs[i + 1] = 1.0 / (1.0 + rate * tenor)
else:
# Par swap bootstrap: rate * sum(DF_j * tau_j) + DF_n = 1
# Approximate with annual coupons
pv_coupons = 0.0
for j in range(i):
if tenors[j] >= 1.0 or j == 0:
dt = tenors[j] if j == 0 else tenors[j] - tenors[j - 1]
pv_coupons += all_dfs[j + 1] * rate * dt
dt_last = tenor - (tenors[i - 1] if i > 0 else 0)
all_dfs[i + 1] = (1.0 - pv_coupons) / (1.0 + rate * dt_last)
all_dfs[i + 1] = np.clip(all_dfs[i + 1], 0.01, 1.0)
discount_factors[i] = all_dfs[i + 1]
return DiscountCurve(tenors=all_tenors, discount_factors=np.concatenate([[1.0], discount_factors]))
# ─── Hull-White calibration ──────────────────────────────────────────────────
def _hw_zcb_price(t: float, T: float, r_t: float, a: float, sigma: float, curve: DiscountCurve) -> float:
if T <= t:
return 1.0
B = (1.0 - np.exp(-a * (T - t))) / a if a > 1e-10 else (T - t)
P_0_T = float(curve.df(np.array([T]))[0])
P_0_t = float(curve.df(np.array([t]))[0])
if P_0_t < 1e-15:
return P_0_T
f_0_t = float(curve.zero_rate(np.array([t]))[0])
log_A = np.log(P_0_T / P_0_t) + B * f_0_t - (sigma**2 / (4.0 * a)) * B**2 * (1.0 - np.exp(-2.0 * a * t))
return np.exp(log_A - B * r_t)
def _hw_normal_vol(a: float, sigma: float, expiry: float, swap_tenor: float) -> float:
"""Approximate HW-implied normal swaption volatility (fast closed-form)."""
if a < 1e-10:
return sigma * np.sqrt(expiry) * swap_tenor
B_swap = (1.0 - np.exp(-a * swap_tenor)) / (a * swap_tenor)
var_factor = (1.0 - np.exp(-2.0 * a * expiry)) / (2.0 * a * expiry)
return sigma * np.sqrt(expiry * var_factor) * B_swap
def calibrate_hull_white(
curve: DiscountCurve,
swaption_vols: np.ndarray,
swaption_expiries: np.ndarray,
swaption_tenors: np.ndarray,
) -> HullWhiteParams:
n_exp, n_ten = swaption_vols.shape
def objective(params):
a, sigma = params
if a <= 1e-6 or sigma <= 1e-6 or a > 1.0 or sigma > 0.10:
return 1e10
total_err = 0.0
for i in range(n_exp):
for j in range(n_ten):
model_vol = _hw_normal_vol(a, sigma, swaption_expiries[i], swaption_tenors[j])
total_err += (model_vol - swaption_vols[i, j]) ** 2
return total_err
result = scipy.optimize.minimize(
objective, x0=[0.03, 0.008], method="Nelder-Mead",
options={"maxiter": 500, "xatol": 1e-8},
)
a_opt, sigma_opt = max(result.x[0], 0.001), max(result.x[1], 0.001)
# Compute theta(t) to fit initial term structure
dt = 0.25
t_grid = np.arange(0, 31, dt)
f_rates = curve.zero_rate(np.maximum(t_grid, 0.01))
df_dt = np.gradient(f_rates * t_grid, dt)
theta = df_dt + a_opt * f_rates + (sigma_opt**2 / (2.0 * a_opt)) * (1.0 - np.exp(-2.0 * a_opt * t_grid))
return HullWhiteParams(mean_reversion=a_opt, volatility=sigma_opt, theta_times=t_grid, theta_values=theta)
# ─── Monte Carlo rate scenario generation ────────────────────────────────────
def generate_rate_scenarios(
hw: HullWhiteParams,
curve: DiscountCurve,
n_scenarios: int,
n_steps: int,
horizon: float = 5.0,
seed: int = 12345,
) -> RateScenarios:
rng = np.random.default_rng(seed)
dt = horizon / n_steps
time_grid = np.linspace(0, horizon, n_steps + 1)
r0 = float(curve.zero_rate(np.array([0.01]))[0])
rates = np.zeros((n_scenarios, n_steps + 1))
rates[:, 0] = r0
theta_interp = scipy.interpolate.interp1d(hw.theta_times, hw.theta_values, fill_value="extrapolate")
a = hw.mean_reversion
sigma = hw.volatility
Z = rng.standard_normal((n_scenarios, n_steps))
for step in range(n_steps):
t = time_grid[step]
theta_t = float(theta_interp(t))
r = rates[:, step]
drift = (theta_t - a * r) * dt
diffusion = sigma * np.sqrt(dt) * Z[:, step]
rates[:, step + 1] = r + drift + diffusion
return RateScenarios(rates=rates, time_grid=time_grid)
# ─── Credit curve calibration ────────────────────────────────────────────────
def calibrate_single_credit_curve(
cp_id: int,
spreads: np.ndarray,
cds_tenors: np.ndarray,
curve: DiscountCurve,
) -> CreditCurve:
n_tenors = len(cds_tenors)
hazard_rates = np.zeros(n_tenors)
dt_fine = 0.25
for i in range(n_tenors):
T = cds_tenors[i]
spread = spreads[i]
t_grid = np.arange(dt_fine, T + dt_fine / 2, dt_fine)
n_points = len(t_grid)
# Bootstrap: solve for hazard rate in this tenor bucket
def cds_error(lam_i):
lam_trial = hazard_rates.copy()
lam_trial[i] = lam_i
premium_leg = 0.0
protection_leg = 0.0
for k in range(n_points):
t_k = t_grid[k]
# Survival probability using all hazard rates up to t_k
surv = 1.0
for m in range(n_tenors):
t_start = 0.0 if m == 0 else cds_tenors[m - 1]
t_end = cds_tenors[m]
if t_k > t_start:
dt_eff = min(t_k, t_end) - t_start
surv *= np.exp(-lam_trial[m] * max(dt_eff, 0))
d = float(curve.df(np.array([t_k]))[0])
premium_leg += spread * dt_fine * d * surv
# Protection: default probability in this interval
surv_prev = 1.0
t_prev = t_k - dt_fine
for m in range(n_tenors):
t_start = 0.0 if m == 0 else cds_tenors[m - 1]
t_end = cds_tenors[m]
if t_prev > t_start:
dt_eff = min(t_prev, t_end) - t_start
surv_prev *= np.exp(-lam_trial[m] * max(dt_eff, 0))
protection_leg += d * (surv_prev - surv)
return premium_leg - protection_leg
try:
hazard_rates[i] = scipy.optimize.brentq(cds_error, 1e-6, 2.0, maxiter=200)
except (ValueError, RuntimeError):
hazard_rates[i] = spreads[i] / 0.6 # approximate: spread ~ lambda * LGD
return CreditCurve(counterparty_id=cp_id, tenors=cds_tenors, hazard_rates=hazard_rates)
def calibrate_credit_curves(
cds_spreads: np.ndarray,
cds_tenors: np.ndarray,
curve: DiscountCurve,
) -> List[CreditCurve]:
n_counterparties = cds_spreads.shape[0]
return [calibrate_single_credit_curve(cp, cds_spreads[cp], cds_tenors, curve) for cp in range(n_counterparties)]
# ─── Survival probabilities & LGD ───────────────────────────────────────────
def compute_survival_probs(
credit_curves: List[CreditCurve],
scenarios: RateScenarios,
) -> np.ndarray:
time_grid = scenarios.time_grid
n_cp = len(credit_curves)
n_t = len(time_grid)
surv = np.zeros((n_cp, n_t))
for i, cc in enumerate(credit_curves):
surv[i] = cc.survival_prob(time_grid)
return surv
def estimate_lgd(
cds_spreads: np.ndarray,
credit_curves: List[CreditCurve],
n_simulations: int = 5000,
seed: int = 99,
) -> np.ndarray:
rng = np.random.default_rng(seed)
n_cp = len(credit_curves)
lgd_estimates = np.zeros(n_cp)
for i in range(n_cp):
avg_spread = np.mean(cds_spreads[i])
avg_hazard = np.mean(credit_curves[i].hazard_rates)
if avg_hazard > 1e-10:
point_lgd = np.clip(avg_spread / avg_hazard, 0.10, 0.95)
else:
point_lgd = 0.40
# Bayesian estimation with Beta prior
alpha = point_lgd * 20
beta_param = (1 - point_lgd) * 20
samples = rng.beta(max(alpha, 0.5), max(beta_param, 0.5), n_simulations)
lgd_estimates[i] = np.mean(samples)
return lgd_estimates
# ─── Swap portfolio pricing under scenarios ──────────────────────────────────
def _precompute_swap_flat(swaps: List[Swap]):
"""Flatten all swap cash flows into contiguous arrays for fully vectorised pricing."""
n_swaps = len(swaps)
# Collect per-swap metadata
fixed_rates = np.array([s.fixed_rate for s in swaps])
notionals = np.array([s.notional for s in swaps])
signs = np.array([1.0 if s.is_payer else -1.0 for s in swaps])
cp_ids = np.array([s.counterparty_id for s in swaps], dtype=int)
max_payments = max(len(s.payment_times) for s in swaps)
# Padded arrays (n_swaps, max_payments) — padded slots have pay_time = 0
pay_times = np.zeros((n_swaps, max_payments))
day_fracs = np.zeros((n_swaps, max_payments))
n_payments = np.array([len(s.payment_times) for s in swaps], dtype=int)
for i, s in enumerate(swaps):
n = n_payments[i]
pay_times[i, :n] = s.payment_times
day_fracs[i, :n] = s.day_fractions
return pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_payments, max_payments
def price_portfolio_single_scenario(
scenario_rates: np.ndarray,
time_grid: np.ndarray,
pay_times: np.ndarray,
day_fracs: np.ndarray,
fixed_rates: np.ndarray,
notionals: np.ndarray,
signs: np.ndarray,
cp_ids: np.ndarray,
n_payments: np.ndarray,
n_counterparties: int,
a: float,
sigma: float,
log_P0: np.ndarray,
zr0: np.ndarray,
log_P0_grid: np.ndarray,
zr0_grid: np.ndarray,
) -> np.ndarray:
"""Price all swaps at all time steps for one MC scenario.
Vectorises across swaps×payments using masked arrays so the inner loop is
only over time steps (60 iterations), not swaps (1000).
"""
n_steps = len(time_grid) - 1
n_swaps, max_pay = pay_times.shape
cp_exposure = np.zeros((n_steps, n_counterparties))
sigma2_4a = (sigma ** 2) / (4.0 * a) if a > 1e-10 else 0.0
for step in range(n_steps):
t = time_grid[step + 1]
r_t = scenario_rates[step + 1]
# Precomputed curve values at t
lp0t = log_P0_grid[step] # log P(0,t)
f0t = zr0_grid[step] # instantaneous forward ~ zero rate at t
exp_2at = np.exp(-2.0 * a * t) if a > 1e-10 else 1.0
# Mask: which (swap, payment) pairs are still in the future?
future = pay_times > t # (n_swaps, max_pay) bool
# Any swap with no future payments is dead
alive = np.any(future, axis=1) # (n_swaps,)
if not np.any(alive):
continue
# For alive swaps, compute HW ZCB prices P(t, T_k) for all payment slots
# tau_{i,k} = T_{i,k} - t (zero for non-future slots)
tau = np.where(future, pay_times - t, 0.0)
B = np.where(future, (1.0 - np.exp(-a * tau)) / a if a > 1e-10 else tau, 0.0)
# log A_{i,k} = log P(0,T_k) - log P(0,t) + B*f(0,t) - σ²/4a * B² * (1-e^{-2at})
log_A = np.where(future, log_P0 - lp0t + B * f0t - sigma2_4a * B ** 2 * (1.0 - exp_2at), 0.0)
dfs = np.where(future, np.exp(log_A - B * r_t), 0.0)
# Forward rates: F_k = (DF_{k-1}/DF_k - 1) / tau_k
# DF_{k-1} for the first future payment of each swap is 1.0 (discounting from t)
df_prev = np.zeros_like(dfs)
# Shift dfs right within each swap, inserting 1.0 at the first future slot
for i in range(max_pay):
col_is_first_future = future[:, i] & (np.sum(future[:, :i], axis=1) == 0)
df_prev[:, i] = np.where(col_is_first_future, 1.0, dfs[:, max(i - 1, 0)] if i > 0 else 0.0)
# Fix: for non-first-future slots that are still future, df_prev = previous dfs
for i in range(1, max_pay):
non_first = future[:, i] & ~(future[:, i] & (np.sum(future[:, :i], axis=1) == 0))
df_prev[:, i] = np.where(non_first, dfs[:, i - 1], df_prev[:, i])
safe_dfs = np.maximum(dfs, 1e-15)
safe_tau = np.maximum(day_fracs, 1e-10)
float_rates = np.where(future, (df_prev / safe_dfs - 1.0) / safe_tau, 0.0)
# Fixed leg PV per swap: sum_k (r_fixed * tau_k * DF_k) * notional
fixed_pv = np.sum(fixed_rates[:, None] * day_fracs * dfs * future, axis=1) * notionals
# Float leg PV per swap: sum_k (F_k * tau_k * DF_k) * notional
float_pv = np.sum(float_rates * day_fracs * dfs * future, axis=1) * notionals
npvs = signs * (float_pv - fixed_pv)
# Positive exposure per counterparty
pos = np.maximum(npvs, 0.0) * alive
np.add.at(cp_exposure[step], cp_ids, pos)
return cp_exposure
def _price_scenario_batch(
scenario_batch: List[np.ndarray],
time_grid: np.ndarray,
pay_times: np.ndarray,
day_fracs: np.ndarray,
fixed_rates: np.ndarray,
notionals: np.ndarray,
signs: np.ndarray,
cp_ids: np.ndarray,
n_pay: np.ndarray,
n_counterparties: int,
a: float,
sigma: float,
log_P0: np.ndarray,
zr0: np.ndarray,
log_P0_grid: np.ndarray,
zr0_grid: np.ndarray,
) -> List[np.ndarray]:
return [
price_portfolio_single_scenario(
sc, time_grid,
pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay,
n_counterparties, a, sigma,
log_P0, zr0, log_P0_grid, zr0_grid,
)
for sc in scenario_batch
]
def price_swap_portfolio(
swaps: List[Swap],
scenarios: RateScenarios,
n_counterparties: int,
hw: HullWhiteParams,
curve: DiscountCurve,
) -> np.ndarray:
rate_scenarios, time_grid = scenarios.rates, scenarios.time_grid
n_scenarios = rate_scenarios.shape[0]
n_steps = len(time_grid) - 1
# Precompute swap arrays
pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay, max_pay = _precompute_swap_flat(swaps)
# Precompute curve values for all payment times
pay_flat = pay_times.copy()
pay_flat[pay_flat <= 0] = 0.001
log_P0 = curve._interp(pay_flat) # log discount factors
zr0 = -log_P0 / pay_flat
# Precompute curve values at each time grid point
grid_pts = time_grid[1:]
safe_grid = np.maximum(grid_pts, 0.001)
log_P0_grid = curve._interp(safe_grid)
zr0_grid = -log_P0_grid / safe_grid
a, sigma = hw.mean_reversion, hw.volatility
# Split scenarios into a list for parfun distribution
scenario_list = [rate_scenarios[s] for s in range(n_scenarios)]
results = _price_scenario_batch(
scenario_list, time_grid,
pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay,
n_counterparties, a, sigma,
log_P0, zr0, log_P0_grid, zr0_grid,
)
return np.array(results)
# ─── Exposure profiles ───────────────────────────────────────────────────────
def compute_exposure_profiles(
exposures: np.ndarray,
scenarios: RateScenarios,
) -> ExposureProfile:
time_grid = scenarios.time_grid
# exposures: (n_scenarios, n_steps, n_counterparties)
ee = np.mean(exposures, axis=0) # (n_steps, n_counterparties)
pfe = np.percentile(exposures, 97.5, axis=0)
# Time-weighted EPE per counterparty
dt = np.diff(time_grid)
epe = np.zeros(ee.shape[1])
T = time_grid[-1]
for cp in range(ee.shape[1]):
epe[cp] = np.sum(ee[:, cp] * dt) / T
return ExposureProfile(
time_grid=time_grid[1:],
expected_exposure=ee,
potential_future_exposure=pfe,
expected_positive_exposure=epe,
)
# ─── CVA computation ─────────────────────────────────────────────────────────
def compute_cva(
profiles: ExposureProfile,
survival_probs: np.ndarray,
lgd: np.ndarray,
curve: DiscountCurve,
) -> CVAResult:
time_grid = profiles.time_grid
ee = profiles.expected_exposure # (n_steps, n_counterparties)
n_steps, n_cp = ee.shape
cp_cva = np.zeros(n_cp)
dfs = curve.df(time_grid)
for cp in range(n_cp):
for k in range(n_steps):
t = time_grid[k]
# Marginal default probability: S(t_{k-1}) - S(t_k)
surv_prev = survival_probs[cp, k] if k == 0 else survival_probs[cp, k]
surv_curr = survival_probs[cp, k + 1] if k + 1 < survival_probs.shape[1] else survival_probs[cp, -1]
# Use discrete approximation
if k == 0:
delta_pd = 1.0 - survival_probs[cp, 1] if survival_probs.shape[1] > 1 else 0.0
else:
idx_prev = min(k, survival_probs.shape[1] - 1)
idx_curr = min(k + 1, survival_probs.shape[1] - 1)
delta_pd = max(survival_probs[cp, idx_prev] - survival_probs[cp, idx_curr], 0.0)
cp_cva[cp] += lgd[cp] * dfs[k] * ee[k, cp] * delta_pd
total_cva = float(np.sum(cp_cva))
total_npv = float(np.sum(profiles.expected_positive_exposure))
return CVAResult(
counterparty_cva=cp_cva,
total_cva=total_cva,
total_portfolio_npv=total_npv,
n_counterparties=n_cp,
n_scenarios=0,
n_swaps=0,
)
# ─── Main pipeline ───────────────────────────────────────────────────────────
def cva_pipeline(
swap_rates: np.ndarray,
tenors: np.ndarray,
swaption_vols: np.ndarray,
swaption_expiries: np.ndarray,
swaption_tenors: np.ndarray,
cds_spreads: np.ndarray,
cds_tenors: np.ndarray,
swaps: list,
n_counterparties: int,
n_scenarios: int,
n_steps: int,
horizon: float = 5.0,
seed: int = 12345,
lgd_seed: int = 99,
lgd_n_simulations: int = 5000,
) -> CVAResult:
curve = bootstrap_yield_curve(swap_rates, tenors)
hw = calibrate_hull_white(curve, swaption_vols, swaption_expiries, swaption_tenors)
scenarios = generate_rate_scenarios(hw, curve, n_scenarios, n_steps, horizon, seed)
credit_curves = calibrate_credit_curves(cds_spreads, cds_tenors, curve)
surv_probs = compute_survival_probs(credit_curves, scenarios)
lgd = estimate_lgd(cds_spreads, credit_curves, lgd_n_simulations, lgd_seed)
exposures = price_swap_portfolio(swaps, scenarios, n_counterparties, hw, curve)
profiles = compute_exposure_profiles(exposures, scenarios)
return compute_cva(profiles, surv_probs, lgd, curve)
if __name__ == "__main__":
n_counterparties = 50
n_swaps_per_cp = 20
n_scenarios = 5000
n_steps = 60
market_seed = 42
swap_rates, tenors, swaption_vols, swaption_expiries, swaption_tenors, cds_spreads, cds_tenors, swaps = (
generate_market_data(n_counterparties, n_swaps_per_cp, market_seed)
)
pipeline_kwargs = dict(
swap_rates=swap_rates, tenors=tenors,
swaption_vols=swaption_vols, swaption_expiries=swaption_expiries, swaption_tenors=swaption_tenors,
cds_spreads=cds_spreads, cds_tenors=cds_tenors,
swaps=swaps, n_counterparties=n_counterparties,
n_scenarios=n_scenarios, n_steps=n_steps,
)
result = cva_pipeline(**pipeline_kwargs)
result.n_scenarios = n_scenarios
result.n_swaps = len(swaps)
print(f"Portfolio CVA: ${result.total_cva:,.2f}")
print(f"Number of counterparties: {result.n_counterparties}")
print(f"Number of swaps: {result.n_swaps}")
print(f"Number of scenarios: {result.n_scenarios}")
print(f"Top 5 counterparty CVAs: {np.sort(result.counterparty_cva)[-5:][::-1]}")
Pargraph + Parfun Implementation¶
[ ]:
import os
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["GOTO_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
from scaler import Client
from pargraph import GraphEngine, delayed, graph
import parfun as pf
scheduler_address = "tcp://172.31.3.107:7788"
object_storage_address = "tcp://172.31.3.107:7789"
from dataclasses import dataclass, field
from typing import Dict, List, Tuple
import numpy as np
import scipy.interpolate
import scipy.optimize
# ─── Domain types ─────────────────────────────────────────────────────────────
@dataclass
class DiscountCurve:
tenors: np.ndarray
discount_factors: np.ndarray
_interp: object = field(repr=False, default=None)
def __post_init__(self):
log_df = np.log(self.discount_factors)
self._interp = scipy.interpolate.CubicSpline(self.tenors, log_df, bc_type="natural")
def df(self, t: np.ndarray) -> np.ndarray:
return np.exp(self._interp(t))
def zero_rate(self, t: np.ndarray) -> np.ndarray:
t = np.asarray(t, dtype=float)
safe_t = np.where(t < 1e-10, 1e-10, t)
return -np.log(self.df(safe_t)) / safe_t
@dataclass
class HullWhiteParams:
mean_reversion: float # a
volatility: float # sigma
theta_times: np.ndarray # time grid for theta(t)
theta_values: np.ndarray # calibrated theta(t)
@dataclass
class RateScenarios:
rates: np.ndarray # (n_scenarios, n_steps+1)
time_grid: np.ndarray # (n_steps+1,)
@dataclass
class CreditCurve:
counterparty_id: int
tenors: np.ndarray
hazard_rates: np.ndarray # piecewise-constant hazard rates
def survival_prob(self, t: np.ndarray) -> np.ndarray:
t = np.asarray(t, dtype=float)
result = np.ones_like(t)
for i in range(len(self.tenors)):
t_start = 0.0 if i == 0 else self.tenors[i - 1]
t_end = self.tenors[i]
lam = self.hazard_rates[i]
mask = t > t_start
dt = np.minimum(t, t_end) - t_start
dt = np.maximum(dt, 0.0)
result[mask] *= np.exp(-lam * dt[mask])
return result
@dataclass
class Swap:
counterparty_id: int
notional: float
fixed_rate: float
payment_times: np.ndarray # payment dates in years
day_fractions: np.ndarray # year fractions per period
is_payer: bool # True = we pay fixed, receive float
@dataclass
class ExposureProfile:
time_grid: np.ndarray
expected_exposure: np.ndarray # (n_time_steps, n_counterparties)
potential_future_exposure: np.ndarray # 97.5th percentile
expected_positive_exposure: np.ndarray # (n_counterparties,) time-averaged
@dataclass
class CVAResult:
counterparty_cva: np.ndarray # CVA per counterparty
total_cva: float
total_portfolio_npv: float
n_counterparties: int
n_scenarios: int
n_swaps: int
# ─── Market data generation ──────────────────────────────────────────────────
def generate_market_data(
n_counterparties: int,
n_swaps_per_cp: int,
seed: int,
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, List[Swap]]:
rng = np.random.default_rng(seed)
# Yield curve: par swap rates at standard tenors
tenors = np.array([0.25, 0.5, 1, 2, 3, 5, 7, 10, 15, 20, 30])
base_rates = np.array([0.042, 0.041, 0.040, 0.038, 0.037, 0.036, 0.037, 0.038, 0.040, 0.041, 0.042])
swap_rates = base_rates + rng.normal(0, 0.001, len(tenors))
# Swaption implied vols: (expiry, tenor) grid
swaption_expiries = np.array([0.5, 1, 2, 5])
swaption_tenors = np.array([2, 5, 10])
swaption_vols = np.full((len(swaption_expiries), len(swaption_tenors)), 0.0060)
for i, exp in enumerate(swaption_expiries):
for j, ten in enumerate(swaption_tenors):
swaption_vols[i, j] = 0.004 + 0.003 * np.exp(-0.3 * exp) + 0.001 * np.exp(-0.1 * ten)
swaption_vols[i, j] += rng.normal(0, 0.0002)
# CDS spreads per counterparty (bps) at tenors [1, 3, 5, 7, 10] years
cds_tenors = np.array([1.0, 3.0, 5.0, 7.0, 10.0])
cds_spreads = np.zeros((n_counterparties, len(cds_tenors)))
for cp in range(n_counterparties):
base_spread = rng.uniform(30, 400) # bps
term_slope = rng.uniform(-5, 15)
for j, t in enumerate(cds_tenors):
cds_spreads[cp, j] = max(5, base_spread + term_slope * t + rng.normal(0, 10))
cds_spreads[cp] /= 10000 # convert to decimal
# Generate swap portfolio
swaps = []
for cp in range(n_counterparties):
for _ in range(n_swaps_per_cp):
maturity = rng.choice([2, 3, 5, 7, 10, 15, 20])
freq = 0.5 # semi-annual
n_payments = int(maturity / freq)
payment_times = np.arange(1, n_payments + 1) * freq
day_fractions = np.full(n_payments, freq)
fixed_rate = 0.035 + rng.normal(0, 0.005)
notional = rng.choice([1e6, 5e6, 10e6, 25e6, 50e6])
swaps.append(
Swap(
counterparty_id=cp,
notional=notional,
fixed_rate=fixed_rate,
payment_times=payment_times,
day_fractions=day_fractions,
is_payer=bool(rng.choice([True, False])),
)
)
return swap_rates, tenors, swaption_vols, swaption_expiries, swaption_tenors, cds_spreads, cds_tenors, swaps
# ─── Yield curve bootstrapping ───────────────────────────────────────────────
@delayed
def bootstrap_yield_curve(swap_rates: np.ndarray, tenors: np.ndarray) -> DiscountCurve:
n = len(tenors)
discount_factors = np.zeros(n)
# Prepend t=0, DF=1
all_tenors = np.concatenate([[0.0], tenors])
all_dfs = np.concatenate([[1.0], np.zeros(n)])
for i in range(n):
rate = swap_rates[i]
tenor = tenors[i]
# For short tenors (< 1Y): zero-coupon style
if tenor <= 1.0:
all_dfs[i + 1] = 1.0 / (1.0 + rate * tenor)
else:
# Par swap bootstrap: rate * sum(DF_j * tau_j) + DF_n = 1
# Approximate with annual coupons
pv_coupons = 0.0
for j in range(i):
if tenors[j] >= 1.0 or j == 0:
dt = tenors[j] if j == 0 else tenors[j] - tenors[j - 1]
pv_coupons += all_dfs[j + 1] * rate * dt
dt_last = tenor - (tenors[i - 1] if i > 0 else 0)
all_dfs[i + 1] = (1.0 - pv_coupons) / (1.0 + rate * dt_last)
all_dfs[i + 1] = np.clip(all_dfs[i + 1], 0.01, 1.0)
discount_factors[i] = all_dfs[i + 1]
return DiscountCurve(tenors=all_tenors, discount_factors=np.concatenate([[1.0], discount_factors]))
# ─── Hull-White calibration ──────────────────────────────────────────────────
def _hw_zcb_price(t: float, T: float, r_t: float, a: float, sigma: float, curve: DiscountCurve) -> float:
if T <= t:
return 1.0
B = (1.0 - np.exp(-a * (T - t))) / a if a > 1e-10 else (T - t)
P_0_T = float(curve.df(np.array([T]))[0])
P_0_t = float(curve.df(np.array([t]))[0])
if P_0_t < 1e-15:
return P_0_T
f_0_t = float(curve.zero_rate(np.array([t]))[0])
log_A = np.log(P_0_T / P_0_t) + B * f_0_t - (sigma**2 / (4.0 * a)) * B**2 * (1.0 - np.exp(-2.0 * a * t))
return np.exp(log_A - B * r_t)
def _hw_normal_vol(a: float, sigma: float, expiry: float, swap_tenor: float) -> float:
"""Approximate HW-implied normal swaption volatility (fast closed-form)."""
if a < 1e-10:
return sigma * np.sqrt(expiry) * swap_tenor
B_swap = (1.0 - np.exp(-a * swap_tenor)) / (a * swap_tenor)
var_factor = (1.0 - np.exp(-2.0 * a * expiry)) / (2.0 * a * expiry)
return sigma * np.sqrt(expiry * var_factor) * B_swap
@delayed
def calibrate_hull_white(
curve: DiscountCurve,
swaption_vols: np.ndarray,
swaption_expiries: np.ndarray,
swaption_tenors: np.ndarray,
) -> HullWhiteParams:
n_exp, n_ten = swaption_vols.shape
def objective(params):
a, sigma = params
if a <= 1e-6 or sigma <= 1e-6 or a > 1.0 or sigma > 0.10:
return 1e10
total_err = 0.0
for i in range(n_exp):
for j in range(n_ten):
model_vol = _hw_normal_vol(a, sigma, swaption_expiries[i], swaption_tenors[j])
total_err += (model_vol - swaption_vols[i, j]) ** 2
return total_err
result = scipy.optimize.minimize(
objective, x0=[0.03, 0.008], method="Nelder-Mead",
options={"maxiter": 500, "xatol": 1e-8},
)
a_opt, sigma_opt = max(result.x[0], 0.001), max(result.x[1], 0.001)
# Compute theta(t) to fit initial term structure
dt = 0.25
t_grid = np.arange(0, 31, dt)
f_rates = curve.zero_rate(np.maximum(t_grid, 0.01))
df_dt = np.gradient(f_rates * t_grid, dt)
theta = df_dt + a_opt * f_rates + (sigma_opt**2 / (2.0 * a_opt)) * (1.0 - np.exp(-2.0 * a_opt * t_grid))
return HullWhiteParams(mean_reversion=a_opt, volatility=sigma_opt, theta_times=t_grid, theta_values=theta)
# ─── Monte Carlo rate scenario generation ────────────────────────────────────
@delayed
def generate_rate_scenarios(
hw: HullWhiteParams,
curve: DiscountCurve,
n_scenarios: int,
n_steps: int,
horizon: float = 5.0,
seed: int = 12345,
) -> RateScenarios:
rng = np.random.default_rng(seed)
dt = horizon / n_steps
time_grid = np.linspace(0, horizon, n_steps + 1)
r0 = float(curve.zero_rate(np.array([0.01]))[0])
rates = np.zeros((n_scenarios, n_steps + 1))
rates[:, 0] = r0
theta_interp = scipy.interpolate.interp1d(hw.theta_times, hw.theta_values, fill_value="extrapolate")
a = hw.mean_reversion
sigma = hw.volatility
Z = rng.standard_normal((n_scenarios, n_steps))
for step in range(n_steps):
t = time_grid[step]
theta_t = float(theta_interp(t))
r = rates[:, step]
drift = (theta_t - a * r) * dt
diffusion = sigma * np.sqrt(dt) * Z[:, step]
rates[:, step + 1] = r + drift + diffusion
return RateScenarios(rates=rates, time_grid=time_grid)
# ─── Credit curve calibration ────────────────────────────────────────────────
def calibrate_single_credit_curve(
cp_id: int,
spreads: np.ndarray,
cds_tenors: np.ndarray,
curve: DiscountCurve,
) -> CreditCurve:
n_tenors = len(cds_tenors)
hazard_rates = np.zeros(n_tenors)
dt_fine = 0.25
for i in range(n_tenors):
T = cds_tenors[i]
spread = spreads[i]
t_grid = np.arange(dt_fine, T + dt_fine / 2, dt_fine)
n_points = len(t_grid)
# Bootstrap: solve for hazard rate in this tenor bucket
def cds_error(lam_i):
lam_trial = hazard_rates.copy()
lam_trial[i] = lam_i
premium_leg = 0.0
protection_leg = 0.0
for k in range(n_points):
t_k = t_grid[k]
# Survival probability using all hazard rates up to t_k
surv = 1.0
for m in range(n_tenors):
t_start = 0.0 if m == 0 else cds_tenors[m - 1]
t_end = cds_tenors[m]
if t_k > t_start:
dt_eff = min(t_k, t_end) - t_start
surv *= np.exp(-lam_trial[m] * max(dt_eff, 0))
d = float(curve.df(np.array([t_k]))[0])
premium_leg += spread * dt_fine * d * surv
# Protection: default probability in this interval
surv_prev = 1.0
t_prev = t_k - dt_fine
for m in range(n_tenors):
t_start = 0.0 if m == 0 else cds_tenors[m - 1]
t_end = cds_tenors[m]
if t_prev > t_start:
dt_eff = min(t_prev, t_end) - t_start
surv_prev *= np.exp(-lam_trial[m] * max(dt_eff, 0))
protection_leg += d * (surv_prev - surv)
return premium_leg - protection_leg
try:
hazard_rates[i] = scipy.optimize.brentq(cds_error, 1e-6, 2.0, maxiter=200)
except (ValueError, RuntimeError):
hazard_rates[i] = spreads[i] / 0.6 # approximate: spread ~ lambda * LGD
return CreditCurve(counterparty_id=cp_id, tenors=cds_tenors, hazard_rates=hazard_rates)
@delayed
def calibrate_credit_curves(
cds_spreads: np.ndarray,
cds_tenors: np.ndarray,
curve: DiscountCurve,
) -> List[CreditCurve]:
n_counterparties = cds_spreads.shape[0]
return [calibrate_single_credit_curve(cp, cds_spreads[cp], cds_tenors, curve) for cp in range(n_counterparties)]
# ─── Survival probabilities & LGD ───────────────────────────────────────────
@delayed
def compute_survival_probs(
credit_curves: List[CreditCurve],
scenarios: RateScenarios,
) -> np.ndarray:
time_grid = scenarios.time_grid
n_cp = len(credit_curves)
n_t = len(time_grid)
surv = np.zeros((n_cp, n_t))
for i, cc in enumerate(credit_curves):
surv[i] = cc.survival_prob(time_grid)
return surv
@delayed
def estimate_lgd(
cds_spreads: np.ndarray,
credit_curves: List[CreditCurve],
n_simulations: int = 5000,
seed: int = 99,
) -> np.ndarray:
rng = np.random.default_rng(seed)
n_cp = len(credit_curves)
lgd_estimates = np.zeros(n_cp)
for i in range(n_cp):
avg_spread = np.mean(cds_spreads[i])
avg_hazard = np.mean(credit_curves[i].hazard_rates)
if avg_hazard > 1e-10:
point_lgd = np.clip(avg_spread / avg_hazard, 0.10, 0.95)
else:
point_lgd = 0.40
# Bayesian estimation with Beta prior
alpha = point_lgd * 20
beta_param = (1 - point_lgd) * 20
samples = rng.beta(max(alpha, 0.5), max(beta_param, 0.5), n_simulations)
lgd_estimates[i] = np.mean(samples)
return lgd_estimates
# ─── Swap portfolio pricing under scenarios ──────────────────────────────────
def _precompute_swap_flat(swaps: List[Swap]):
"""Flatten all swap cash flows into contiguous arrays for fully vectorised pricing."""
n_swaps = len(swaps)
# Collect per-swap metadata
fixed_rates = np.array([s.fixed_rate for s in swaps])
notionals = np.array([s.notional for s in swaps])
signs = np.array([1.0 if s.is_payer else -1.0 for s in swaps])
cp_ids = np.array([s.counterparty_id for s in swaps], dtype=int)
max_payments = max(len(s.payment_times) for s in swaps)
# Padded arrays (n_swaps, max_payments) — padded slots have pay_time = 0
pay_times = np.zeros((n_swaps, max_payments))
day_fracs = np.zeros((n_swaps, max_payments))
n_payments = np.array([len(s.payment_times) for s in swaps], dtype=int)
for i, s in enumerate(swaps):
n = n_payments[i]
pay_times[i, :n] = s.payment_times
day_fracs[i, :n] = s.day_fractions
return pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_payments, max_payments
def price_portfolio_single_scenario(
scenario_rates: np.ndarray,
time_grid: np.ndarray,
pay_times: np.ndarray,
day_fracs: np.ndarray,
fixed_rates: np.ndarray,
notionals: np.ndarray,
signs: np.ndarray,
cp_ids: np.ndarray,
n_payments: np.ndarray,
n_counterparties: int,
a: float,
sigma: float,
log_P0: np.ndarray,
zr0: np.ndarray,
log_P0_grid: np.ndarray,
zr0_grid: np.ndarray,
) -> np.ndarray:
"""Price all swaps at all time steps for one MC scenario.
Vectorises across swaps×payments using masked arrays so the inner loop is
only over time steps (60 iterations), not swaps (1000).
"""
n_steps = len(time_grid) - 1
n_swaps, max_pay = pay_times.shape
cp_exposure = np.zeros((n_steps, n_counterparties))
sigma2_4a = (sigma ** 2) / (4.0 * a) if a > 1e-10 else 0.0
for step in range(n_steps):
t = time_grid[step + 1]
r_t = scenario_rates[step + 1]
# Precomputed curve values at t
lp0t = log_P0_grid[step] # log P(0,t)
f0t = zr0_grid[step] # instantaneous forward ~ zero rate at t
exp_2at = np.exp(-2.0 * a * t) if a > 1e-10 else 1.0
# Mask: which (swap, payment) pairs are still in the future?
future = pay_times > t # (n_swaps, max_pay) bool
# Any swap with no future payments is dead
alive = np.any(future, axis=1) # (n_swaps,)
if not np.any(alive):
continue
# For alive swaps, compute HW ZCB prices P(t, T_k) for all payment slots
# tau_{i,k} = T_{i,k} - t (zero for non-future slots)
tau = np.where(future, pay_times - t, 0.0)
B = np.where(future, (1.0 - np.exp(-a * tau)) / a if a > 1e-10 else tau, 0.0)
# log A_{i,k} = log P(0,T_k) - log P(0,t) + B*f(0,t) - σ²/4a * B² * (1-e^{-2at})
log_A = np.where(future, log_P0 - lp0t + B * f0t - sigma2_4a * B ** 2 * (1.0 - exp_2at), 0.0)
dfs = np.where(future, np.exp(log_A - B * r_t), 0.0)
# Forward rates: F_k = (DF_{k-1}/DF_k - 1) / tau_k
# DF_{k-1} for the first future payment of each swap is 1.0 (discounting from t)
df_prev = np.zeros_like(dfs)
# Shift dfs right within each swap, inserting 1.0 at the first future slot
for i in range(max_pay):
col_is_first_future = future[:, i] & (np.sum(future[:, :i], axis=1) == 0)
df_prev[:, i] = np.where(col_is_first_future, 1.0, dfs[:, max(i - 1, 0)] if i > 0 else 0.0)
# Fix: for non-first-future slots that are still future, df_prev = previous dfs
for i in range(1, max_pay):
non_first = future[:, i] & ~(future[:, i] & (np.sum(future[:, :i], axis=1) == 0))
df_prev[:, i] = np.where(non_first, dfs[:, i - 1], df_prev[:, i])
safe_dfs = np.maximum(dfs, 1e-15)
safe_tau = np.maximum(day_fracs, 1e-10)
float_rates = np.where(future, (df_prev / safe_dfs - 1.0) / safe_tau, 0.0)
# Fixed leg PV per swap: sum_k (r_fixed * tau_k * DF_k) * notional
fixed_pv = np.sum(fixed_rates[:, None] * day_fracs * dfs * future, axis=1) * notionals
# Float leg PV per swap: sum_k (F_k * tau_k * DF_k) * notional
float_pv = np.sum(float_rates * day_fracs * dfs * future, axis=1) * notionals
npvs = signs * (float_pv - fixed_pv)
# Positive exposure per counterparty
pos = np.maximum(npvs, 0.0) * alive
np.add.at(cp_exposure[step], cp_ids, pos)
return cp_exposure
@pf.parallel(
split=pf.per_argument(scenario_batch=pf.py_list.by_chunk),
combine_with=pf.py_list.concat,
fixed_partition_size=10,
)
def _price_scenario_batch(
scenario_batch: List[np.ndarray],
time_grid: np.ndarray,
pay_times: np.ndarray,
day_fracs: np.ndarray,
fixed_rates: np.ndarray,
notionals: np.ndarray,
signs: np.ndarray,
cp_ids: np.ndarray,
n_pay: np.ndarray,
n_counterparties: int,
a: float,
sigma: float,
log_P0: np.ndarray,
zr0: np.ndarray,
log_P0_grid: np.ndarray,
zr0_grid: np.ndarray,
) -> List[np.ndarray]:
return [
price_portfolio_single_scenario(
sc, time_grid,
pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay,
n_counterparties, a, sigma,
log_P0, zr0, log_P0_grid, zr0_grid,
)
for sc in scenario_batch
]
@delayed
def price_swap_portfolio(
swaps: List[Swap],
scenarios: RateScenarios,
n_counterparties: int,
hw: HullWhiteParams,
curve: DiscountCurve,
) -> np.ndarray:
rate_scenarios, time_grid = scenarios.rates, scenarios.time_grid
n_scenarios = rate_scenarios.shape[0]
n_steps = len(time_grid) - 1
# Precompute swap arrays
pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay, max_pay = _precompute_swap_flat(swaps)
# Precompute curve values for all payment times
pay_flat = pay_times.copy()
pay_flat[pay_flat <= 0] = 0.001
log_P0 = curve._interp(pay_flat) # log discount factors
zr0 = -log_P0 / pay_flat
# Precompute curve values at each time grid point
grid_pts = time_grid[1:]
safe_grid = np.maximum(grid_pts, 0.001)
log_P0_grid = curve._interp(safe_grid)
zr0_grid = -log_P0_grid / safe_grid
a, sigma = hw.mean_reversion, hw.volatility
# Split scenarios into a list for parfun distribution
scenario_list = [rate_scenarios[s] for s in range(n_scenarios)]
with pf.set_parallel_backend_context(
"scaler_remote",
scheduler_address=scheduler_address,
object_storage_address=object_storage_address,
n_workers=128,
):
results = _price_scenario_batch(
scenario_list, time_grid,
pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay,
n_counterparties, a, sigma,
log_P0, zr0, log_P0_grid, zr0_grid,
)
return np.array(results)
# ─── Exposure profiles ───────────────────────────────────────────────────────
@delayed
def compute_exposure_profiles(
exposures: np.ndarray,
scenarios: RateScenarios,
) -> ExposureProfile:
time_grid = scenarios.time_grid
# exposures: (n_scenarios, n_steps, n_counterparties)
ee = np.mean(exposures, axis=0) # (n_steps, n_counterparties)
pfe = np.percentile(exposures, 97.5, axis=0)
# Time-weighted EPE per counterparty
dt = np.diff(time_grid)
epe = np.zeros(ee.shape[1])
T = time_grid[-1]
for cp in range(ee.shape[1]):
epe[cp] = np.sum(ee[:, cp] * dt) / T
return ExposureProfile(
time_grid=time_grid[1:],
expected_exposure=ee,
potential_future_exposure=pfe,
expected_positive_exposure=epe,
)
# ─── CVA computation ─────────────────────────────────────────────────────────
@delayed
def compute_cva(
profiles: ExposureProfile,
survival_probs: np.ndarray,
lgd: np.ndarray,
curve: DiscountCurve,
) -> CVAResult:
time_grid = profiles.time_grid
ee = profiles.expected_exposure # (n_steps, n_counterparties)
n_steps, n_cp = ee.shape
cp_cva = np.zeros(n_cp)
dfs = curve.df(time_grid)
for cp in range(n_cp):
for k in range(n_steps):
t = time_grid[k]
# Marginal default probability: S(t_{k-1}) - S(t_k)
surv_prev = survival_probs[cp, k] if k == 0 else survival_probs[cp, k]
surv_curr = survival_probs[cp, k + 1] if k + 1 < survival_probs.shape[1] else survival_probs[cp, -1]
# Use discrete approximation
if k == 0:
delta_pd = 1.0 - survival_probs[cp, 1] if survival_probs.shape[1] > 1 else 0.0
else:
idx_prev = min(k, survival_probs.shape[1] - 1)
idx_curr = min(k + 1, survival_probs.shape[1] - 1)
delta_pd = max(survival_probs[cp, idx_prev] - survival_probs[cp, idx_curr], 0.0)
cp_cva[cp] += lgd[cp] * dfs[k] * ee[k, cp] * delta_pd
total_cva = float(np.sum(cp_cva))
total_npv = float(np.sum(profiles.expected_positive_exposure))
return CVAResult(
counterparty_cva=cp_cva,
total_cva=total_cva,
total_portfolio_npv=total_npv,
n_counterparties=n_cp,
n_scenarios=0,
n_swaps=0,
)
# ─── Main pipeline ───────────────────────────────────────────────────────────
@graph
def cva_pipeline(
swap_rates: np.ndarray,
tenors: np.ndarray,
swaption_vols: np.ndarray,
swaption_expiries: np.ndarray,
swaption_tenors: np.ndarray,
cds_spreads: np.ndarray,
cds_tenors: np.ndarray,
swaps: list,
n_counterparties: int,
n_scenarios: int,
n_steps: int,
horizon: float = 5.0,
seed: int = 12345,
lgd_seed: int = 99,
lgd_n_simulations: int = 5000,
) -> CVAResult:
curve = bootstrap_yield_curve(swap_rates, tenors)
hw = calibrate_hull_white(curve, swaption_vols, swaption_expiries, swaption_tenors)
scenarios = generate_rate_scenarios(hw, curve, n_scenarios, n_steps, horizon, seed)
credit_curves = calibrate_credit_curves(cds_spreads, cds_tenors, curve)
surv_probs = compute_survival_probs(credit_curves, scenarios)
lgd = estimate_lgd(cds_spreads, credit_curves, lgd_n_simulations, lgd_seed)
exposures = price_swap_portfolio(swaps, scenarios, n_counterparties, hw, curve)
profiles = compute_exposure_profiles(exposures, scenarios)
return compute_cva(profiles, surv_probs, lgd, curve)
if __name__ == "__main__":
n_counterparties = 50
n_swaps_per_cp = 20
n_scenarios = 5000
n_steps = 60
market_seed = 42
swap_rates, tenors, swaption_vols, swaption_expiries, swaption_tenors, cds_spreads, cds_tenors, swaps = (
generate_market_data(n_counterparties, n_swaps_per_cp, market_seed)
)
pipeline_kwargs = dict(
swap_rates=swap_rates, tenors=tenors,
swaption_vols=swaption_vols, swaption_expiries=swaption_expiries, swaption_tenors=swaption_tenors,
cds_spreads=cds_spreads, cds_tenors=cds_tenors,
swaps=swaps, n_counterparties=n_counterparties,
n_scenarios=n_scenarios, n_steps=n_steps,
)
client = Client(scheduler_address, object_storage_address=object_storage_address)
engine = GraphEngine(client)
graph_obj = cva_pipeline.to_graph()
dict_graph, keys = graph_obj.to_dict(**pipeline_kwargs)
(result,) = engine.get(dict_graph, keys)
result.n_scenarios = n_scenarios
result.n_swaps = len(swaps)
print(f"Portfolio CVA: ${result.total_cva:,.2f}")
print(f"Number of counterparties: {result.n_counterparties}")
print(f"Number of swaps: {result.n_swaps}")
print(f"Number of scenarios: {result.n_scenarios}")
print(f"Top 5 counterparty CVAs: {np.sort(result.counterparty_cva)[-5:][::-1]}")
Diff: Native vs Pargraph + Parfun¶
[2]:
import difflib
import json
import re
from IPython.display import HTML, display
with open("SwapCVA.ipynb") as _f:
_nb = json.load(_f)
native_code = "".join(_nb["cells"][3]["source"])
parallel_code = "".join(_nb["cells"][5]["source"])
differ = difflib.HtmlDiff(wrapcolumn=90)
table = differ.make_table(
native_code.splitlines(),
parallel_code.splitlines(),
fromdesc="Native Python",
todesc="With Pargraph + Parfun",
context=True,
numlines=10,
)
# Strip unwanted columns: navigation links, line numbers, and nowrap attribute
table = re.sub(r'<td class="diff_next"[^>]*>.*?</td>', "", table)
table = re.sub(r'<th class="diff_next"[^>]*>.*?</th>', "", table)
table = re.sub(r'<td class="diff_header"[^>]*>.*?</td>', "", table)
table = table.replace('colspan="2"', "")
table = table.replace(' nowrap="nowrap"', "")
table = re.sub(
r"(\s*<colgroup></colgroup>){6}",
"\n <colgroup></colgroup> <colgroup></colgroup>",
table,
)
css = """<style>
table.diff { font-family: monospace; font-size: 10px; border-collapse: collapse; width: 100%; }
table.diff td { padding: 2px 8px; white-space: pre-wrap; word-break: break-all; text-align: left !important; }
table.diff th { padding: 6px 8px; text-align: center; background-color: #f0f0f0; font-size: 14px; }
td.diff_add { background-color: #e6ffec; }
td.diff_chg { background-color: #fff8c5; }
td.diff_sub { background-color: #ffebe9; }
span.diff_add { background-color: #ccffd8; }
span.diff_chg { background-color: #fff2a8; }
span.diff_sub { background-color: #ffc0c0; }
</style>"""
display(HTML(css + table))
| Native Python | With Pargraph + Parfun |
|---|---|
| import os | import os |
| os.environ["OPENBLAS_NUM_THREADS"] = "1" | os.environ["OPENBLAS_NUM_THREADS"] = "1" |
| os.environ["GOTO_NUM_THREADS"] = "1" | os.environ["GOTO_NUM_THREADS"] = "1" |
| os.environ["OMP_NUM_THREADS"] = "1" | os.environ["OMP_NUM_THREADS"] = "1" |
| os.environ["MKL_NUM_THREADS"] = "1" | os.environ["MKL_NUM_THREADS"] = "1" |
| from scaler import Client | |
| from pargraph import GraphEngine, delayed, graph | |
| import parfun as pf | |
| scheduler_address = "tcp://172.31.3.107:7788" | |
| object_storage_address = "tcp://172.31.3.107:7789" | |
| from dataclasses import dataclass, field | from dataclasses import dataclass, field |
| from typing import Dict, List, Tuple | from typing import Dict, List, Tuple |
| import numpy as np | import numpy as np |
| import scipy.interpolate | import scipy.interpolate |
| import scipy.optimize | import scipy.optimize |
| # ─── Domain types ───────────────────────────────────────────────────────────── | # ─── Domain types ───────────────────────────────────────────────────────────── |
| is_payer=bool(rng.choice([True, False])), | is_payer=bool(rng.choice([True, False])), |
| ) | ) |
| ) | ) |
| return swap_rates, tenors, swaption_vols, swaption_expiries, swaption_tenors, cds_spre | return swap_rates, tenors, swaption_vols, swaption_expiries, swaption_tenors, cds_spre |
| ads, cds_tenors, swaps | ads, cds_tenors, swaps |
| # ─── Yield curve bootstrapping ─────────────────────────────────────────────── | # ─── Yield curve bootstrapping ─────────────────────────────────────────────── |
| @delayed | |
| def bootstrap_yield_curve(swap_rates: np.ndarray, tenors: np.ndarray) -> DiscountCurve: | def bootstrap_yield_curve(swap_rates: np.ndarray, tenors: np.ndarray) -> DiscountCurve: |
| n = len(tenors) | n = len(tenors) |
| discount_factors = np.zeros(n) | discount_factors = np.zeros(n) |
| # Prepend t=0, DF=1 | # Prepend t=0, DF=1 |
| all_tenors = np.concatenate([[0.0], tenors]) | all_tenors = np.concatenate([[0.0], tenors]) |
| all_dfs = np.concatenate([[1.0], np.zeros(n)]) | all_dfs = np.concatenate([[1.0], np.zeros(n)]) |
| for i in range(n): | for i in range(n): |
| rate = swap_rates[i] | rate = swap_rates[i] |
| def _hw_normal_vol(a: float, sigma: float, expiry: float, swap_tenor: float) -> float: | def _hw_normal_vol(a: float, sigma: float, expiry: float, swap_tenor: float) -> float: |
| """Approximate HW-implied normal swaption volatility (fast closed-form).""" | """Approximate HW-implied normal swaption volatility (fast closed-form).""" |
| if a < 1e-10: | if a < 1e-10: |
| return sigma * np.sqrt(expiry) * swap_tenor | return sigma * np.sqrt(expiry) * swap_tenor |
| B_swap = (1.0 - np.exp(-a * swap_tenor)) / (a * swap_tenor) | B_swap = (1.0 - np.exp(-a * swap_tenor)) / (a * swap_tenor) |
| var_factor = (1.0 - np.exp(-2.0 * a * expiry)) / (2.0 * a * expiry) | var_factor = (1.0 - np.exp(-2.0 * a * expiry)) / (2.0 * a * expiry) |
| return sigma * np.sqrt(expiry * var_factor) * B_swap | return sigma * np.sqrt(expiry * var_factor) * B_swap |
| @delayed | |
| def calibrate_hull_white( | def calibrate_hull_white( |
| curve: DiscountCurve, | curve: DiscountCurve, |
| swaption_vols: np.ndarray, | swaption_vols: np.ndarray, |
| swaption_expiries: np.ndarray, | swaption_expiries: np.ndarray, |
| swaption_tenors: np.ndarray, | swaption_tenors: np.ndarray, |
| ) -> HullWhiteParams: | ) -> HullWhiteParams: |
| n_exp, n_ten = swaption_vols.shape | n_exp, n_ten = swaption_vols.shape |
| def objective(params): | def objective(params): |
| a, sigma = params | a, sigma = params |
| f_rates = curve.zero_rate(np.maximum(t_grid, 0.01)) | f_rates = curve.zero_rate(np.maximum(t_grid, 0.01)) |
| df_dt = np.gradient(f_rates * t_grid, dt) | df_dt = np.gradient(f_rates * t_grid, dt) |
| theta = df_dt + a_opt * f_rates + (sigma_opt**2 / (2.0 * a_opt)) * (1.0 - np.exp(-2.0 | theta = df_dt + a_opt * f_rates + (sigma_opt**2 / (2.0 * a_opt)) * (1.0 - np.exp(-2.0 |
| * a_opt * t_grid)) | * a_opt * t_grid)) |
| return HullWhiteParams(mean_reversion=a_opt, volatility=sigma_opt, theta_times=t_grid, | return HullWhiteParams(mean_reversion=a_opt, volatility=sigma_opt, theta_times=t_grid, |
| theta_values=theta) | theta_values=theta) |
| # ─── Monte Carlo rate scenario generation ──────────────────────────────────── | # ─── Monte Carlo rate scenario generation ──────────────────────────────────── |
| @delayed | |
| def generate_rate_scenarios( | def generate_rate_scenarios( |
| hw: HullWhiteParams, | hw: HullWhiteParams, |
| curve: DiscountCurve, | curve: DiscountCurve, |
| n_scenarios: int, | n_scenarios: int, |
| n_steps: int, | n_steps: int, |
| horizon: float = 5.0, | horizon: float = 5.0, |
| seed: int = 12345, | seed: int = 12345, |
| ) -> RateScenarios: | ) -> RateScenarios: |
| rng = np.random.default_rng(seed) | rng = np.random.default_rng(seed) |
| dt = horizon / n_steps | dt = horizon / n_steps |
| return premium_leg - protection_leg | return premium_leg - protection_leg |
| try: | try: |
| hazard_rates[i] = scipy.optimize.brentq(cds_error, 1e-6, 2.0, maxiter=200) | hazard_rates[i] = scipy.optimize.brentq(cds_error, 1e-6, 2.0, maxiter=200) |
| except (ValueError, RuntimeError): | except (ValueError, RuntimeError): |
| hazard_rates[i] = spreads[i] / 0.6 # approximate: spread ~ lambda * LGD | hazard_rates[i] = spreads[i] / 0.6 # approximate: spread ~ lambda * LGD |
| return CreditCurve(counterparty_id=cp_id, tenors=cds_tenors, hazard_rates=hazard_rates | return CreditCurve(counterparty_id=cp_id, tenors=cds_tenors, hazard_rates=hazard_rates |
| ) | ) |
| @delayed | |
| def calibrate_credit_curves( | def calibrate_credit_curves( |
| cds_spreads: np.ndarray, | cds_spreads: np.ndarray, |
| cds_tenors: np.ndarray, | cds_tenors: np.ndarray, |
| curve: DiscountCurve, | curve: DiscountCurve, |
| ) -> List[CreditCurve]: | ) -> List[CreditCurve]: |
| n_counterparties = cds_spreads.shape[0] | n_counterparties = cds_spreads.shape[0] |
| return [calibrate_single_credit_curve(cp, cds_spreads[cp], cds_tenors, curve) for cp i | return [calibrate_single_credit_curve(cp, cds_spreads[cp], cds_tenors, curve) for cp i |
| n range(n_counterparties)] | n range(n_counterparties)] |
| # ─── Survival probabilities & LGD ─────────────────────────────────────────── | # ─── Survival probabilities & LGD ─────────────────────────────────────────── |
| @delayed | |
| def compute_survival_probs( | def compute_survival_probs( |
| credit_curves: List[CreditCurve], | credit_curves: List[CreditCurve], |
| scenarios: RateScenarios, | scenarios: RateScenarios, |
| ) -> np.ndarray: | ) -> np.ndarray: |
| time_grid = scenarios.time_grid | time_grid = scenarios.time_grid |
| n_cp = len(credit_curves) | n_cp = len(credit_curves) |
| n_t = len(time_grid) | n_t = len(time_grid) |
| surv = np.zeros((n_cp, n_t)) | surv = np.zeros((n_cp, n_t)) |
| for i, cc in enumerate(credit_curves): | for i, cc in enumerate(credit_curves): |
| surv[i] = cc.survival_prob(time_grid) | surv[i] = cc.survival_prob(time_grid) |
| return surv | return surv |
| @delayed | |
| def estimate_lgd( | def estimate_lgd( |
| cds_spreads: np.ndarray, | cds_spreads: np.ndarray, |
| credit_curves: List[CreditCurve], | credit_curves: List[CreditCurve], |
| n_simulations: int = 5000, | n_simulations: int = 5000, |
| seed: int = 99, | seed: int = 99, |
| ) -> np.ndarray: | ) -> np.ndarray: |
| rng = np.random.default_rng(seed) | rng = np.random.default_rng(seed) |
| n_cp = len(credit_curves) | n_cp = len(credit_curves) |
| lgd_estimates = np.zeros(n_cp) | lgd_estimates = np.zeros(n_cp) |
| npvs = signs * (float_pv - fixed_pv) | npvs = signs * (float_pv - fixed_pv) |
| # Positive exposure per counterparty | # Positive exposure per counterparty |
| pos = np.maximum(npvs, 0.0) * alive | pos = np.maximum(npvs, 0.0) * alive |
| np.add.at(cp_exposure[step], cp_ids, pos) | np.add.at(cp_exposure[step], cp_ids, pos) |
| return cp_exposure | return cp_exposure |
| @pf.parallel( | |
| split=pf.per_argument(scenario_batch=pf.py_list.by_chunk), | |
| combine_with=pf.py_list.concat, | |
| fixed_partition_size=10, | |
| ) | |
| def _price_scenario_batch( | def _price_scenario_batch( |
| scenario_batch: List[np.ndarray], | scenario_batch: List[np.ndarray], |
| time_grid: np.ndarray, | time_grid: np.ndarray, |
| pay_times: np.ndarray, | pay_times: np.ndarray, |
| day_fracs: np.ndarray, | day_fracs: np.ndarray, |
| fixed_rates: np.ndarray, | fixed_rates: np.ndarray, |
| notionals: np.ndarray, | notionals: np.ndarray, |
| signs: np.ndarray, | signs: np.ndarray, |
| cp_ids: np.ndarray, | cp_ids: np.ndarray, |
| n_pay: np.ndarray, | n_pay: np.ndarray, |
| price_portfolio_single_scenario( | price_portfolio_single_scenario( |
| sc, time_grid, | sc, time_grid, |
| pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay, | pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay, |
| n_counterparties, a, sigma, | n_counterparties, a, sigma, |
| log_P0, zr0, log_P0_grid, zr0_grid, | log_P0, zr0, log_P0_grid, zr0_grid, |
| ) | ) |
| for sc in scenario_batch | for sc in scenario_batch |
| ] | ] |
| @delayed | |
| def price_swap_portfolio( | def price_swap_portfolio( |
| swaps: List[Swap], | swaps: List[Swap], |
| scenarios: RateScenarios, | scenarios: RateScenarios, |
| n_counterparties: int, | n_counterparties: int, |
| hw: HullWhiteParams, | hw: HullWhiteParams, |
| curve: DiscountCurve, | curve: DiscountCurve, |
| ) -> np.ndarray: | ) -> np.ndarray: |
| rate_scenarios, time_grid = scenarios.rates, scenarios.time_grid | rate_scenarios, time_grid = scenarios.rates, scenarios.time_grid |
| n_scenarios = rate_scenarios.shape[0] | n_scenarios = rate_scenarios.shape[0] |
| n_steps = len(time_grid) - 1 | n_steps = len(time_grid) - 1 |
| grid_pts = time_grid[1:] | grid_pts = time_grid[1:] |
| safe_grid = np.maximum(grid_pts, 0.001) | safe_grid = np.maximum(grid_pts, 0.001) |
| log_P0_grid = curve._interp(safe_grid) | log_P0_grid = curve._interp(safe_grid) |
| zr0_grid = -log_P0_grid / safe_grid | zr0_grid = -log_P0_grid / safe_grid |
| a, sigma = hw.mean_reversion, hw.volatility | a, sigma = hw.mean_reversion, hw.volatility |
| # Split scenarios into a list for parfun distribution | # Split scenarios into a list for parfun distribution |
| scenario_list = [rate_scenarios[s] for s in range(n_scenarios)] | scenario_list = [rate_scenarios[s] for s in range(n_scenarios)] |
| results = _price_scenario_batch( | with pf.set_parallel_backend_context( |
| scenario_list, time_grid, | "scaler_remote", |
| pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay, | scheduler_address=scheduler_address, |
| n_counterparties, a, sigma, | object_storage_address=object_storage_address, |
| log_P0, zr0, log_P0_grid, zr0_grid, | n_workers=128, |
| ) | ): |
| results = _price_scenario_batch( | |
| scenario_list, time_grid, | |
| pay_times, day_fracs, fixed_rates, notionals, signs, cp_ids, n_pay, | |
| n_counterparties, a, sigma, | |
| log_P0, zr0, log_P0_grid, zr0_grid, | |
| ) | |
| return np.array(results) | return np.array(results) |
| # ─── Exposure profiles ─────────────────────────────────────────────────────── | # ─── Exposure profiles ─────────────────────────────────────────────────────── |
| @delayed | |
| def compute_exposure_profiles( | def compute_exposure_profiles( |
| exposures: np.ndarray, | exposures: np.ndarray, |
| scenarios: RateScenarios, | scenarios: RateScenarios, |
| ) -> ExposureProfile: | ) -> ExposureProfile: |
| time_grid = scenarios.time_grid | time_grid = scenarios.time_grid |
| # exposures: (n_scenarios, n_steps, n_counterparties) | # exposures: (n_scenarios, n_steps, n_counterparties) |
| ee = np.mean(exposures, axis=0) # (n_steps, n_counterparties) | ee = np.mean(exposures, axis=0) # (n_steps, n_counterparties) |
| pfe = np.percentile(exposures, 97.5, axis=0) | pfe = np.percentile(exposures, 97.5, axis=0) |
| # Time-weighted EPE per counterparty | # Time-weighted EPE per counterparty |
| time_grid=time_grid[1:], | time_grid=time_grid[1:], |
| expected_exposure=ee, | expected_exposure=ee, |
| potential_future_exposure=pfe, | potential_future_exposure=pfe, |
| expected_positive_exposure=epe, | expected_positive_exposure=epe, |
| ) | ) |
| # ─── CVA computation ───────────────────────────────────────────────────────── | # ─── CVA computation ───────────────────────────────────────────────────────── |
| @delayed | |
| def compute_cva( | def compute_cva( |
| profiles: ExposureProfile, | profiles: ExposureProfile, |
| survival_probs: np.ndarray, | survival_probs: np.ndarray, |
| lgd: np.ndarray, | lgd: np.ndarray, |
| curve: DiscountCurve, | curve: DiscountCurve, |
| ) -> CVAResult: | ) -> CVAResult: |
| time_grid = profiles.time_grid | time_grid = profiles.time_grid |
| ee = profiles.expected_exposure # (n_steps, n_counterparties) | ee = profiles.expected_exposure # (n_steps, n_counterparties) |
| n_steps, n_cp = ee.shape | n_steps, n_cp = ee.shape |
| total_portfolio_npv=total_npv, | total_portfolio_npv=total_npv, |
| n_counterparties=n_cp, | n_counterparties=n_cp, |
| n_scenarios=0, | n_scenarios=0, |
| n_swaps=0, | n_swaps=0, |
| ) | ) |
| # ─── Main pipeline ─────────────────────────────────────────────────────────── | # ─── Main pipeline ─────────────────────────────────────────────────────────── |
| @graph | |
| def cva_pipeline( | def cva_pipeline( |
| swap_rates: np.ndarray, | swap_rates: np.ndarray, |
| tenors: np.ndarray, | tenors: np.ndarray, |
| swaption_vols: np.ndarray, | swaption_vols: np.ndarray, |
| swaption_expiries: np.ndarray, | swaption_expiries: np.ndarray, |
| swaption_tenors: np.ndarray, | swaption_tenors: np.ndarray, |
| cds_spreads: np.ndarray, | cds_spreads: np.ndarray, |
| cds_tenors: np.ndarray, | cds_tenors: np.ndarray, |
| swaps: list, | swaps: list, |
| n_counterparties: int, | n_counterparties: int, |
| ) | ) |
| pipeline_kwargs = dict( | pipeline_kwargs = dict( |
| swap_rates=swap_rates, tenors=tenors, | swap_rates=swap_rates, tenors=tenors, |
| swaption_vols=swaption_vols, swaption_expiries=swaption_expiries, swaption_tenors= | swaption_vols=swaption_vols, swaption_expiries=swaption_expiries, swaption_tenors= |
| swaption_tenors, | swaption_tenors, |
| cds_spreads=cds_spreads, cds_tenors=cds_tenors, | cds_spreads=cds_spreads, cds_tenors=cds_tenors, |
| swaps=swaps, n_counterparties=n_counterparties, | swaps=swaps, n_counterparties=n_counterparties, |
| n_scenarios=n_scenarios, n_steps=n_steps, | n_scenarios=n_scenarios, n_steps=n_steps, |
| ) | ) |
| result = cva_pipeline(**pipeline_kwargs) | client = Client(scheduler_address, object_storage_address=object_storage_address) |
| engine = GraphEngine(client) | |
| graph_obj = cva_pipeline.to_graph() | |
| dict_graph, keys = graph_obj.to_dict(**pipeline_kwargs) | |
| (result,) = engine.get(dict_graph, keys) | |
| result.n_scenarios = n_scenarios | result.n_scenarios = n_scenarios |
| result.n_swaps = len(swaps) | result.n_swaps = len(swaps) |
| print(f"Portfolio CVA: ${result.total_cva:,.2f}") | print(f"Portfolio CVA: ${result.total_cva:,.2f}") |
| print(f"Number of counterparties: {result.n_counterparties}") | print(f"Number of counterparties: {result.n_counterparties}") |
| print(f"Number of swaps: {result.n_swaps}") | print(f"Number of swaps: {result.n_swaps}") |
| print(f"Number of scenarios: {result.n_scenarios}") | print(f"Number of scenarios: {result.n_scenarios}") |
| print(f"Top 5 counterparty CVAs: {np.sort(result.counterparty_cva)[-5:][::-1]}") | print(f"Top 5 counterparty CVAs: {np.sort(result.counterparty_cva)[-5:][::-1]}") |
Statistics¶
Parfun |
Pargraph |
Sequential Runtime |
Parallel Runtime |
Workers |
Speedup |
Efficiency |
|---|---|---|---|---|---|---|
Yes |
Yes |
35m12.430s |
1m16.413s |
64 |
27.64x |
0.43 |