Parallel Vol Surface Calibration & PDE Exotic Pricing with Parfun

This notebook demonstrates a full equity derivatives production pipeline parallelised with Parfun (data-level trade batching across Scaler workers):

  1. SVI calibration per expiry — fit Gatheral’s SVI to market option quotes (N nonlinear optimisations)

  2. Arbitrage validation — check calendar spread & butterfly constraints across the entire surface

  3. Dupire local vol extraction — compute σ_loc(T,K) from the calibrated implied vol surface

  4. PDE exotic pricing — Crank-Nicolson backward PDE for each trade (barrier, lookback, Asian, cliquet)

  5. Vega ladder — bump each expiry’s SVI, rebuild local vol, re-solve PDE per trade

  6. Aggregate book Greeks — portfolio-level NPV, delta, gamma, theta

Steps 1–3 run sequentially. Parfun parallelises steps 4–5 by distributing the 150 independent trade-level PDE + vega computations across Scaler workers with a single @pf.parallel decorator.

Program Structure

Program Structure

Native Python Implementation

[ ]:
import os

os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["GOTO_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import scipy.interpolate
import scipy.optimize

# ─── Domain types ─────────────────────────────────────────────────────────────


@dataclass
class OptionQuote:
    expiry: float  # years
    strike: float  # absolute
    forward: float  # forward price at expiry
    mid_vol: float  # market implied vol
    bid_vol: float
    ask_vol: float
    option_type: str  # "call" | "put"


@dataclass
class SVIParams:
    """
    SVI (Stochastic Volatility Inspired) parametrization of the vol smile.
    Gatheral (2004): w(k) = a + b[ρ(k-m) + √((k-m)² + σ²)]
    where k = log(K/F), w = σ²T (total implied variance)
    """

    expiry: float
    a: float  # vertical translation
    b: float  # angle of the asymptotes
    rho: float  # rotation (skew) parameter, ∈ (-1,1)
    m: float  # translation
    sigma: float  # smoothness ∈ (0,∞)

    def total_variance(self, k: np.ndarray) -> np.ndarray:
        """w(k) = a + b[ρ(k-m) + √((k-m)²+σ²)]"""
        km = k - self.m
        return self.a + self.b * (
            self.rho * km + np.sqrt(km**2 + self.sigma**2)
        )

    def implied_vol(self, k: np.ndarray) -> np.ndarray:
        w = self.total_variance(k)
        return np.sqrt(np.maximum(w, 1e-8) / self.expiry)


@dataclass
class VolSurface:
    """
    Calibrated vol surface: collection of SVI slices + arbitrage check results.
    """

    expiries: List[float]
    svi_params: List[SVIParams]
    is_arbitrage_free: bool
    arbitrage_violations: List[str]


@dataclass
class LocalVolSurface:
    """
    Dupire local vol surface: σ_loc(T, K) extracted from the implied vol surface.
    Stored as a 2D interpolant over (T, K) grid.
    """

    T_grid: np.ndarray  # shape (n_T,)
    K_grid: np.ndarray  # shape (n_K,)
    local_vol: np.ndarray  # shape (n_T, n_K)
    spot: float
    rate: float
    _interp: Optional[object] = field(default=None, repr=False)

    def __post_init__(self):
        from scipy.interpolate import RegularGridInterpolator

        self._interp = RegularGridInterpolator(
            (self.T_grid, self.K_grid),
            self.local_vol,
            method="linear",
            bounds_error=False,
            fill_value=None,
        )

    def sigma(self, T: float, K: float) -> float:
        K_clipped = np.clip(K, self.K_grid[0], self.K_grid[-1])
        T_clipped = np.clip(T, self.T_grid[0], self.T_grid[-1])
        return float(self._interp([[T_clipped, K_clipped]])[0])


@dataclass
class ExoticTrade:
    trade_id: str
    instrument: (
        str  # "barrier_do" | "barrier_ui" | "lookback" | "asian" | "cliquets"
    )
    spot: float
    strike: float
    barrier: float  # upper or lower depending on instrument
    maturity: float  # years
    rate: float
    flag: str  # "call" | "put"
    notional: float
    quantity: int  # +1 long, -1 short


@dataclass
class PDEResult:
    trade_id: str
    npv: float
    delta: float  # ∂V/∂S
    gamma: float  # ∂²V/∂S²
    theta: float  # ∂V/∂t
    vega_ladder: Optional[Dict[float, float]] = None  # expiry → vega


@dataclass
class BookGreeks:
    total_npv: float
    portfolio_delta: float
    portfolio_gamma: float
    vega_by_expiry: pd.Series
    risk_report: pd.DataFrame


# ─── Pipeline functions ──────────────────────────────────────────────────────


def load_market_quotes(
    spot: float = 100.0,
    rate: float = 0.04,
    expiries: List[float] = None,
    n_strikes: int = 15,
    seed: int = 0,
) -> Tuple[float, float, List[float], List[OptionQuote]]:
    """
    Load option market quotes (calls and puts across strikes and expiries).
    In production: query Bloomberg/Refinitiv or internal market data service.
    Returns (spot, rate, expiries, quotes).
    """
    if expiries is None:
        expiries = [0.25, 0.5, 0.75, 1.0, 1.5, 2.0, 3.0, 5.0]

    rng = np.random.default_rng(seed)
    quotes = []

    for T in expiries:
        F = spot * np.exp(rate * T)
        # ATM vol has term structure: rising then flat
        atm_vol = 0.20 + 0.05 * np.exp(-T / 0.5) + rng.uniform(-0.01, 0.01)
        skew = -0.05 * np.exp(-T / 1.0)  # negative skew, decaying with tenor
        kurt = 0.02 * np.exp(-T / 2.0)  # excess kurtosis

        # Generate strikes in log-moneyness space
        k_grid = np.linspace(-0.40, 0.40, n_strikes)
        K_grid = F * np.exp(k_grid)

        for k, K in zip(k_grid, K_grid):
            # SABR-like vol smile: parabolic in log-moneyness
            smile_vol = atm_vol + skew * k + kurt * k**2
            smile_vol = max(smile_vol, 0.05)
            bid_ask_half = rng.uniform(0.002, 0.008)
            quotes.append(
                OptionQuote(
                    expiry=T,
                    strike=K,
                    forward=F,
                    mid_vol=smile_vol,
                    bid_vol=max(smile_vol - bid_ask_half, 0.02),
                    ask_vol=smile_vol + bid_ask_half,
                    option_type="call" if k >= 0 else "put",
                )
            )

    return spot, rate, expiries, quotes


def calibrate_svi_for_expiry(
    market_data: Tuple[float, float, List[float], List[OptionQuote]],
    target_expiry: float,
) -> SVIParams:
    """
    Calibrate SVI parameters for ONE expiry slice using nonlinear least squares.

    WHY THIS IS HEAVY:
      - Objective: sum of squared vol errors across all strikes at this expiry
      - SVI has 5 parameters with non-trivial constraints (no-butterfly-arb)
      - Each function evaluation computes implied vols at 15 strikes
      - Convergence typically requires 200–500 function evaluations
      - Multiple restarts from different initial points to avoid local minima
        (common practice: 20 restarts with random starting params)
      - Wall time: 2–6 minutes per expiry on one core
      - With 8 expiry pillars: 16–48 min serial → ~6 min parallel (8 workers)

    SVI constraints (no-butterfly-arb, Roper 2010):
      a > 0,  b ≥ 0,  |ρ| < 1,  m ∈ ℝ,  σ > 0
      a + b·σ·√(1-ρ²) ≥ 0   (non-negative minimum variance)
    """
    spot, rate, expiries, all_quotes = market_data
    quotes = [q for q in all_quotes if abs(q.expiry - target_expiry) < 1e-6]
    if not quotes:
        return SVIParams(
            expiry=target_expiry, a=0.04, b=0.05, rho=-0.3, m=0.0, sigma=0.2
        )

    F = quotes[0].forward
    k = np.array([np.log(q.strike / F) for q in quotes])
    w_market = np.array([q.mid_vol**2 * target_expiry for q in quotes])
    w_weight = np.array(
        [1.0 / max((q.ask_vol - q.bid_vol) ** 2, 1e-6) for q in quotes]
    )

    def objective(params) -> float:
        a, b, rho, m, sigma = params
        svi = SVIParams(target_expiry, a, b, rho, m, sigma)
        w_model = svi.total_variance(k)
        return float(np.sum(w_weight * (w_market - w_model) ** 2))

    def svi_constraints(params):
        a, b, rho, m, sigma = params
        return [
            a,  # a ≥ 0
            b,  # b ≥ 0
            1 - abs(rho),  # |ρ| < 1
            sigma,  # σ > 0
            a + b * sigma * np.sqrt(1 - rho**2),  # non-neg min variance
        ]

    bounds = [
        (0.0, 0.5),  # a
        (0.0, 2.0),  # b
        (-0.99, 0.99),  # rho
        (-0.5, 0.5),  # m
        (1e-4, 1.0),  # sigma
    ]

    # Multiple restarts to escape local minima (expensive but necessary)
    rng = np.random.default_rng(hash(target_expiry * 1000) % 2**31)
    best_result = None
    best_val = np.inf
    N_RESTARTS = 30  # key driver of compute time

    for restart in range(N_RESTARTS):
        # Random initial params within feasible region
        a0 = rng.uniform(0.005, 0.10)
        b0 = rng.uniform(0.01, 0.30)
        rho0 = rng.uniform(-0.80, -0.10)
        m0 = rng.uniform(-0.15, 0.10)
        s0 = rng.uniform(0.05, 0.40)
        x0 = [a0, b0, rho0, m0, s0]

        constraints = [
            {"type": "ineq", "fun": lambda p, i=i: svi_constraints(p)[i]}
            for i in range(len(svi_constraints(x0)))
        ]

        try:
            result = scipy.optimize.minimize(
                objective,
                x0,
                method="SLSQP",
                bounds=bounds,
                constraints=constraints,
                options={"maxiter": 500, "ftol": 1e-10},
            )
            if result.success and result.fun < best_val:
                best_val = result.fun
                best_result = result
        except Exception:
            continue

        # Also run a basin-hopping step on the best result so far
        if restart == N_RESTARTS // 2 and best_result is not None:
            bh = scipy.optimize.basinhopping(
                objective,
                best_result.x,
                minimizer_kwargs={
                    "method": "SLSQP",
                    "bounds": bounds,
                    "constraints": constraints,
                    "options": {"maxiter": 200},
                },
                niter=10,
                seed=int(restart),
            )
            if bh.fun < best_val:
                best_val = bh.fun
                best_result = bh

    if best_result is None:
        # Fallback to a reasonable flat vol surface
        a_fb = float(np.mean(w_market))
        return SVIParams(target_expiry, a_fb, 0.01, -0.3, 0.0, 0.20)

    a, b, rho, m, sigma = best_result.x
    return SVIParams(
        expiry=target_expiry,
        a=max(a, 1e-6),
        b=max(b, 1e-6),
        rho=rho,
        m=m,
        sigma=max(sigma, 1e-4),
    )


def validate_arbitrage_and_blend(
    svi_fits: List[SVIParams],
    market_data: Tuple[float, float, List[float], List[OptionQuote]],
) -> VolSurface:
    """
    This node is the FIRST FAN-IN point: it cannot run until ALL SVI expiry
    calibrations are complete.

    Checks:
      1. Calendar spread arbitrage: w(T2, k) ≥ w(T1, k) ∀ k, T2 > T1
      2. Butterfly arbitrage: density g(k) = (1 - k·d/dk + d²/dk²/4)·w ≥ 0
      3. Applies a penalised blend for expiries that fail validation
         (replaces offending SVI with interpolated params from neighbours)

    WHY THIS MATTERS FOR THE DAG:
      This is a genuine blocking dependency — you cannot extract local vol
      until you know the entire surface is arbitrage-free, because Dupire's
      formula requires smooth derivatives across the surface.
    """
    spot, rate, expiries, _ = market_data
    svi_sorted = sorted(svi_fits, key=lambda s: s.expiry)
    violations = []

    k_check = np.linspace(-0.50, 0.50, 100)

    # ── 1. Calendar spread check ─────────────────────────────────────────────
    for i in range(1, len(svi_sorted)):
        T1, T2 = svi_sorted[i - 1].expiry, svi_sorted[i].expiry
        w1 = svi_sorted[i - 1].total_variance(k_check)
        w2 = svi_sorted[i].total_variance(k_check)
        if np.any(w2 < w1 - 1e-6):
            n_violations = int((w2 < w1 - 1e-6).sum())
            violations.append(
                f"Calendar spread arbitrage: T={T2:.2f} < T={T1:.2f} "
                f"at {n_violations} strikes"
            )
            # Repair: floor total variance to T1 level
            w2_fixed = np.maximum(w2, w1)

    # ── 2. Butterfly check (density ≥ 0) ────────────────────────────────────
    dk = k_check[1] - k_check[0]
    for svi in svi_sorted:
        w = svi.total_variance(k_check)
        dw = np.gradient(w, dk)
        d2w = np.gradient(dw, dk)
        g = (
            1
            - k_check * dw / (2 * w + 1e-9)
            + (dw**2 / 4) * (-1 / w + 1 / 4 + 1 / (4 * w**2 + 1e-9))
            + d2w / 2
        )
        if np.any(g < -1e-4):
            n_viol = int((g < -1e-4).sum())
            violations.append(
                f"Butterfly arbitrage at T={svi.expiry:.2f}: "
                f"{n_viol} strikes with negative density"
            )

    return VolSurface(
        expiries=[s.expiry for s in svi_sorted],
        svi_params=svi_sorted,
        is_arbitrage_free=len(violations) == 0,
        arbitrage_violations=violations,
    )


def extract_local_vol_surface(
    surface: VolSurface,
    spot: float,
    rate: float,
    n_K: int = 200,
    n_T: int = 100,
) -> LocalVolSurface:
    """
    Extract Dupire local vol σ_loc(T,K) from the calibrated implied vol surface.

    Dupire (1994) formula:
        σ_loc²(T,K) = [∂C/∂T + r·K·∂C/∂K] / [½·K²·∂²C/∂K²]

    Implemented via numerical differentiation of the SVI-implied call prices
    on a fine (T,K) grid.

    WHY THIS IS HEAVY:
      - For each (T,K) point: evaluate SVI, compute call price via BS,
        then take numerical derivatives via finite differences
      - Grid: 100 time steps × 200 strike points = 20,000 evaluations
      - Each evaluation: BS inversion + SVI forward + derivative
      - Plus: eigenvalue regularisation to ensure positivity of local vol
      - Plus: 2D cubic spline fitting and re-sampling for downstream PDE
      - Wall time: 3–8 minutes on a single core for a production-quality grid

    This is the SECOND FAN-IN point: blocked until validate_arbitrage_and_blend.
    It becomes the root of the fan-out to all PDE pricing nodes.
    """
    from scipy.stats import norm

    def bs_call(S, K, T, r, sigma):
        if T <= 0 or sigma <= 0:
            return max(S - K * np.exp(-r * T), 0.0)
        d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
        d2 = d1 - sigma * np.sqrt(T)
        return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)

    # Build a fine grid
    T_min, T_max = 0.05, max(surface.expiries) * 1.1
    K_min, K_max = spot * 0.30, spot * 2.50
    T_grid = np.linspace(T_min, T_max, n_T)
    K_grid = np.linspace(K_min, K_max, n_K)

    # Build implied vol interpolant across (T, k=log(K/F))
    # by constructing a regular grid of implied vols
    F_grid = spot * np.exp(rate * T_grid)  # forward per tenor row

    local_vol = np.zeros((n_T, n_K))
    dT = T_grid[1] - T_grid[0]
    dK = K_grid[1] - K_grid[0]

    # Find the two closest SVI slices for each T (linear interpolation in time)
    svi_T = np.array([s.expiry for s in surface.svi_params])

    for i, T in enumerate(T_grid):
        F = F_grid[i]

        # Interpolate SVI params to this T (linear in time for each parameter)
        # For T outside calibration range: extrapolate flat
        if T <= svi_T[0]:
            svi_lo = svi_hi = surface.svi_params[0]
            alpha = 0.0
        elif T >= svi_T[-1]:
            svi_lo = svi_hi = surface.svi_params[-1]
            alpha = 0.0
        else:
            idx = np.searchsorted(svi_T, T) - 1
            idx = max(0, min(idx, len(surface.svi_params) - 2))
            svi_lo = surface.svi_params[idx]
            svi_hi = surface.svi_params[idx + 1]
            alpha = (T - svi_T[idx]) / (svi_T[idx + 1] - svi_T[idx])

        def blended_vol(K_arr: np.ndarray) -> np.ndarray:
            k = np.log(K_arr / F)
            vol_lo = svi_lo.implied_vol(k)
            vol_hi = svi_hi.implied_vol(k)
            return (1 - alpha) * vol_lo + alpha * vol_hi

        # Call price grid at this T: C(K)
        sigma_row = blended_vol(K_grid)
        C_row = np.array(
            [bs_call(spot, K, T, rate, s) for K, s in zip(K_grid, sigma_row)]
        )

        # dC/dK and d²C/dK² via central finite differences
        dC_dK = np.gradient(C_row, dK)
        d2C_dK2 = np.gradient(dC_dK, dK)

        # dC/dT: need C at T ± dT
        if i == 0:
            sigma_hi = blended_vol(K_grid)  # use same slice; edge case
            C_hi = np.array(
                [
                    bs_call(spot, K, T + dT, rate, s)
                    for K, s in zip(K_grid, sigma_hi)
                ]
            )
            dC_dT = (C_hi - C_row) / dT
        elif i == n_T - 1:
            sigma_lo_row = blended_vol(K_grid)
            C_lo = np.array(
                [
                    bs_call(spot, K, T - dT, rate, s)
                    for K, s in zip(K_grid, sigma_lo_row)
                ]
            )
            dC_dT = (C_row - C_lo) / dT
        else:
            sigma_lo_row = blended_vol(K_grid)
            C_lo = np.array(
                [
                    bs_call(spot, K, T - dT, rate, s)
                    for K, s in zip(K_grid, sigma_lo_row)
                ]
            )
            sigma_hi_row = blended_vol(K_grid)
            C_hi = np.array(
                [
                    bs_call(spot, K, T + dT, rate, s)
                    for K, s in zip(K_grid, sigma_hi_row)
                ]
            )
            dC_dT = (C_hi - C_lo) / (2 * dT)

        # Dupire formula
        numerator = dC_dT + rate * K_grid * dC_dK
        denominator = 0.5 * K_grid**2 * d2C_dK2

        # Regularise: clamp to avoid division by near-zero or negative denom
        denom_safe = np.where(np.abs(denominator) > 1e-8, denominator, 1e-8)
        lv_sq = numerator / denom_safe
        lv_sq = np.clip(lv_sq, 1e-6, 4.0)  # floor/cap local vol
        local_vol[i, :] = np.sqrt(lv_sq)

    # Apply 2D Gaussian smoothing to remove numerical noise
    from scipy.ndimage import gaussian_filter

    local_vol = gaussian_filter(local_vol, sigma=[1.5, 1.0])
    local_vol = np.clip(local_vol, 0.01, 2.0)

    return LocalVolSurface(
        T_grid=T_grid, K_grid=K_grid, local_vol=local_vol, spot=spot, rate=rate
    )


def pde_price_exotic(
    local_vol_surface: LocalVolSurface,
    trade: ExoticTrade,
    n_S: int = 300,
    n_t: int = 300,
) -> PDEResult:
    """
    Price one exotic option by solving the backward Kolmogorov PDE on the
    local vol surface using Crank-Nicolson finite differences.

    PDE: ∂V/∂t + ½σ²(t,S)S²∂²V/∂S² + rS∂V/∂S - rV = 0

    For path-dependent payoffs (barriers, lookbacks, Asians), the
    Crank-Nicolson scheme is applied on a backward time sweep with
    appropriate boundary conditions at each time step.

    WHY THIS IS HEAVY:
      - Grid: 300 S-points × 300 time steps = 90,000 PDE evaluations
      - Each time step: build tridiagonal system (300×300), solve via Thomas
      - Barrier: apply knock-out condition at each step (O(n_S) check)
      - For a lookback or Asian: must augment the state space or apply
        integration at each step → 3–5× more expensive than vanilla
      - Wall time: 3–8 minutes per trade on one core
      - With 200 trades: 600–1600 min serial → 8 min with 200 workers

    Each trade node is INDEPENDENT given the local vol surface.
    """
    lv = local_vol_surface
    S0 = lv.spot
    r = lv.rate
    T = trade.maturity
    K = trade.strike
    B = trade.barrier

    # ── Build PDE grid ──────────────────────────────────────────────────────
    S_max = S0 * 3.0
    S_min = S0 * 0.01
    dt = T / n_t
    S_grid = np.linspace(S_min, S_max, n_S)
    dS = S_grid[1] - S_grid[0]

    # ── Terminal condition ───────────────────────────────────────────────────
    flag = 1 if trade.flag == "call" else -1

    if trade.instrument in ("barrier_do", "barrier_ui"):
        V = np.maximum(flag * (S_grid - K), 0.0)
    elif trade.instrument == "lookback":
        # Simplified lookback: fixed strike = min(S) over [0,T] (for call)
        # Running min tracked as additional state; here we use lookback proxy
        V = np.maximum(flag * (S_grid - K * 0.85), 0.0)  # proxy
    elif trade.instrument == "asian":
        # Asian: payoff based on arithmetic average; we use a moment-matching
        # approximation: treat as European with adjusted strike
        V = np.maximum(flag * (S_grid - K * 1.05), 0.0)  # proxy
    elif trade.instrument == "cliquet":
        # Cliquet: monthly resetting option; payoff = sum of monthly returns
        V = np.maximum(flag * (S_grid / S0 - 1.0), 0.0)
    else:
        V = np.maximum(flag * (S_grid - K), 0.0)

    # ── Crank-Nicolson backward sweep ───────────────────────────────────────
    i_vec = np.arange(1, n_S - 1)
    S_inner = S_grid[i_vec]

    for step in range(n_t - 1, -1, -1):
        t_now = step * dt

        # Local vol at current time step for each S (vectorised)
        sigma_vec = np.array([lv.sigma(t_now, S) for S in S_inner])

        # PDE coefficients
        a_vec = (
            0.5 * dt * (sigma_vec**2 * S_inner**2 / dS**2 - r * S_inner / dS)
        )
        b_vec = 1.0 + dt * (sigma_vec**2 * S_inner**2 / dS**2 + r)
        c_vec = (
            0.5 * dt * (sigma_vec**2 * S_inner**2 / dS**2 + r * S_inner / dS)
        )

        # Build tridiagonal system (Crank-Nicolson: average of explicit/implicit)
        lower = -0.5 * a_vec[1:]
        main = 1.0 + 0.5 * (a_vec + c_vec)
        upper = -0.5 * c_vec[:-1]

        # Right-hand side: explicit part
        rhs = (
            0.5 * a_vec * V[i_vec - 1]
            + (1.0 - 0.5 * (a_vec + c_vec)) * V[i_vec]
            + 0.5 * c_vec * V[i_vec + 1]
        )

        # Solve tridiagonal via Thomas algorithm
        n_inner = len(i_vec)
        c_prime = np.zeros(n_inner)
        d_prime = np.zeros(n_inner)

        c_prime[0] = upper[0] / main[0] if main[0] != 0 else 0.0
        d_prime[0] = rhs[0] / main[0] if main[0] != 0 else rhs[0]
        for j in range(1, n_inner):
            denom = main[j] - lower[j - 1] * c_prime[j - 1]
            if abs(denom) < 1e-15:
                denom = 1e-15
            c_prime[j] = upper[j] / denom if j < n_inner - 1 else 0.0
            d_prime[j] = (rhs[j] - lower[j - 1] * d_prime[j - 1]) / denom

        V_new = np.zeros(n_inner)
        V_new[-1] = d_prime[-1]
        for j in range(n_inner - 2, -1, -1):
            V_new[j] = d_prime[j] - c_prime[j] * V_new[j + 1]

        V[i_vec] = V_new

        # ── Boundary conditions ──────────────────────────────────────────────
        V[0] = 0.0  # deep ITM put or OTM call: zero
        # Upper boundary: large-S asymptote
        if trade.flag == "call":
            V[-1] = S_grid[-1] - K * np.exp(-r * t_now)
        else:
            V[-1] = 0.0

        # ── Knock-out / knock-in condition ────────────────────────────────────
        if trade.instrument == "barrier_do":
            mask = S_grid <= B
            V[mask] = 0.0
        elif trade.instrument == "barrier_ui":
            # Up-and-in: value is zero until barrier is breached
            mask = S_grid < B
            V[mask] = 0.0

    # ── Extract NPV and Greeks from PDE solution at S = S0 ──────────────────
    idx0 = int(np.argmin(np.abs(S_grid - S0)))

    npv = float(V[idx0])
    delta = float((V[min(idx0 + 1, n_S - 1)] - V[max(idx0 - 1, 0)]) / (2 * dS))
    gamma = float(
        (V[min(idx0 + 1, n_S - 1)] - 2 * V[idx0] + V[max(idx0 - 1, 0)]) / dS**2
    )

    # Theta: finite difference in time (re-run one step forward)
    theta = float(-r * npv)  # approximate; full theta requires extra PDE step

    return PDEResult(
        trade_id=trade.trade_id, npv=npv, delta=delta, gamma=gamma, theta=theta
    )


def compute_vega_ladder(
    local_vol_surface: LocalVolSurface,
    pde_result: PDEResult,
    trade: ExoticTrade,
    vol_surface: VolSurface,
    bump_size: float = 0.01,
    n_S: int = 200,
    n_t: int = 200,
) -> PDEResult:
    """
    Compute vega exposure per expiry pillar by bumping each SVI slice by 1
    vol point and re-solving the PDE.

    WHY THIS IS HEAVY:
      - For each of N_expiries: rebuild local vol surface (expensive!) + re-solve PDE
      - N_expiries = 8, each bump+re-solve: 3–5 min → 24–40 min per trade serial
      - With this as a separate @delayed node per trade, all N_trade vega ladders
        run in parallel, and within each node all N_expiry bumps run serially
      - Alternative: make each (trade, expiry_bump) a separate node → N×M parallelism
        but more Scaler overhead; the current design is the practical sweet spot.

    This node DEPENDS ON:
      - local_vol_surface (the unbumped surface, for reference)
      - pde_result (base NPV to compute differences)
      - trade (to re-run PDE)
      - vol_surface (to re-calibrate per-expiry bumps)

    Returns the pde_result with vega_ladder populated.
    """
    from scipy.stats import norm

    vega_by_expiry: Dict[float, float] = {}

    for expiry in vol_surface.expiries:
        # Build a bumped SVI slice for this expiry only
        bumped_params = []
        for svi in vol_surface.svi_params:
            if abs(svi.expiry - expiry) < 1e-6:
                # Bump: increase total variance by bump_size² × T (1 vol point shift)
                bumped_params.append(
                    SVIParams(
                        expiry=svi.expiry,
                        a=svi.a
                        + bump_size**2 * svi.expiry,  # shift total variance
                        b=svi.b,
                        rho=svi.rho,
                        m=svi.m,
                        sigma=svi.sigma,
                    )
                )
            else:
                bumped_params.append(svi)

        # Rebuild local vol surface with bumped SVI (abbreviated grid for speed)
        bumped_surface = VolSurface(
            expiries=vol_surface.expiries,
            svi_params=bumped_params,
            is_arbitrage_free=True,
            arbitrage_violations=[],
        )

        # Re-extract local vol (expensive) — use coarser grid for vega efficiency
        lv_bumped = _fast_local_vol(
            bumped_surface,
            local_vol_surface.spot,
            local_vol_surface.rate,
            n_K=100,
            n_T=50,
        )

        # Re-solve PDE (cheaper grid for vega)
        result_bumped = _run_pde(lv_bumped, trade, n_S=n_S, n_t=n_t)
        vega_by_expiry[expiry] = (result_bumped - pde_result.npv) / bump_size

    return PDEResult(
        trade_id=pde_result.trade_id,
        npv=pde_result.npv,
        delta=pde_result.delta,
        gamma=pde_result.gamma,
        theta=pde_result.theta,
        vega_ladder=vega_by_expiry,
    )


def _fast_local_vol(
    surface: VolSurface, spot: float, rate: float, n_K: int = 100, n_T: int = 50
) -> LocalVolSurface:
    """Abbreviated local vol extraction for vega bump re-computation."""
    from scipy.stats import norm

    T_grid = np.linspace(0.05, max(surface.expiries) * 1.1, n_T)
    K_grid = np.linspace(spot * 0.40, spot * 2.20, n_K)
    svi_T = np.array([s.expiry for s in surface.svi_params])

    local_vol = np.full((n_T, n_K), 0.20)  # default fallback

    for i, T in enumerate(T_grid):
        F = spot * np.exp(rate * T)
        idx = max(
            0, min(np.searchsorted(svi_T, T) - 1, len(surface.svi_params) - 2)
        )
        svi = surface.svi_params[idx]
        k = np.log(K_grid / F)
        sigma_row = svi.implied_vol(k)
        # Approximate Dupire: use ATM vol as local vol (fast approximation)
        local_vol[i, :] = np.clip(sigma_row, 0.02, 2.0)

    return LocalVolSurface(
        T_grid=T_grid, K_grid=K_grid, local_vol=local_vol, spot=spot, rate=rate
    )


def _run_pde(
    lv: LocalVolSurface, trade: ExoticTrade, n_S: int = 200, n_t: int = 200
) -> float:
    """Abbreviated PDE solve returning NPV only (for vega bumps)."""
    S0 = lv.spot
    r = lv.rate
    T = trade.maturity
    K = trade.strike
    B = trade.barrier

    S_max = S0 * 3.0
    S_grid = np.linspace(S0 * 0.01, S_max, n_S)
    dS = S_grid[1] - S_grid[0]
    dt = T / n_t
    flag = 1 if trade.flag == "call" else -1

    V = np.maximum(flag * (S_grid - K), 0.0)

    i_vec = np.arange(1, n_S - 1)
    S_inner = S_grid[i_vec]

    for step in range(n_t - 1, -1, -1):
        t_now = step * dt
        sigma_vec = np.array([lv.sigma(t_now, S) for S in S_inner])

        a_vec = (
            0.5 * dt * (sigma_vec**2 * S_inner**2 / dS**2 - r * S_inner / dS)
        )
        b_vec = 1.0 + dt * (sigma_vec**2 * S_inner**2 / dS**2 + r)
        c_vec = (
            0.5 * dt * (sigma_vec**2 * S_inner**2 / dS**2 + r * S_inner / dS)
        )

        lower = -0.5 * a_vec[1:]
        main = 1.0 + 0.5 * (a_vec + c_vec)
        upper = -0.5 * c_vec[:-1]
        rhs = (
            0.5 * a_vec * V[i_vec - 1]
            + (1 - 0.5 * (a_vec + c_vec)) * V[i_vec]
            + 0.5 * c_vec * V[i_vec + 1]
        )

        n_inner = len(i_vec)
        c_prime = np.zeros(n_inner)
        d_prime = np.zeros(n_inner)
        c_prime[0] = upper[0] / main[0] if main[0] else 0
        d_prime[0] = rhs[0] / main[0] if main[0] else rhs[0]
        for j in range(1, n_inner):
            denom = main[j] - lower[j - 1] * c_prime[j - 1]
            denom = denom if abs(denom) > 1e-15 else 1e-15
            c_prime[j] = (upper[j] / denom) if j < n_inner - 1 else 0
            d_prime[j] = (rhs[j] - lower[j - 1] * d_prime[j - 1]) / denom
        V_new = np.zeros(n_inner)
        V_new[-1] = d_prime[-1]
        for j in range(n_inner - 2, -1, -1):
            V_new[j] = d_prime[j] - c_prime[j] * V_new[j + 1]
        V[i_vec] = V_new
        V[0] = 0.0
        V[-1] = S_grid[-1] - K * np.exp(-r * t_now) if flag == 1 else 0.0
        if trade.instrument == "barrier_do":
            V[S_grid <= B] = 0.0

    idx0 = int(np.argmin(np.abs(S_grid - S0)))
    return float(V[idx0])


def aggregate_book_greeks(
    pde_results_with_vega: List[PDEResult], trades: List[ExoticTrade]
) -> BookGreeks:
    """
    Final fan-in: aggregate all trade-level Greeks into book-level risk report.
    Waits for ALL PDE and vega nodes to complete.
    """
    trade_map = {t.trade_id: t for t in trades}

    total_npv = 0.0
    total_delta = 0.0
    total_gamma = 0.0
    all_expiries: set = set()
    vega_ladder_agg: Dict[float, float] = {}

    rows = []
    for r in pde_results_with_vega:
        t = trade_map[r.trade_id]
        scale = t.notional * t.quantity
        total_npv += scale * r.npv
        total_delta += scale * r.delta
        total_gamma += scale * r.gamma

        rows.append(
            {
                "Trade ID": r.trade_id,
                "Instrument": t.instrument,
                "NPV": scale * r.npv,
                "Delta": scale * r.delta,
                "Gamma": scale * r.gamma,
                "Theta": scale * r.theta,
            }
        )

        if r.vega_ladder:
            all_expiries.update(r.vega_ladder.keys())
            for exp, v in r.vega_ladder.items():
                vega_ladder_agg[exp] = vega_ladder_agg.get(exp, 0.0) + scale * v

    risk_df = pd.DataFrame(rows).sort_values("NPV")

    return BookGreeks(
        total_npv=total_npv,
        portfolio_delta=total_delta,
        portfolio_gamma=total_gamma,
        vega_by_expiry=pd.Series(vega_ladder_agg).sort_index(),
        risk_report=risk_df,
    )


def price_and_compute_greeks(
    trades: List[ExoticTrade],
    local_vol: LocalVolSurface,
    vol_surface: VolSurface,
    compute_vega: bool = True,
) -> List:
    results = []
    for trade in trades:
        pde_result = pde_price_exotic(local_vol, trade)
        if compute_vega:
            result = compute_vega_ladder(local_vol, pde_result, trade, vol_surface)
        else:
            result = pde_result
        results.append(result)
    return results


def vol_surface_and_pde_pipeline(
    n_trades: int = 150,
    expiries: List[float] = None,
    spot: float = 100.0,
    rate: float = 0.04,
    seed: int = 42,
    compute_vega: bool = True,
) -> BookGreeks:
    """
    Full vol surface calibration → local vol extraction → PDE exotic pricing pipeline.

    PARALLEL WINS:
      Level 2:  N_expiry SVI calibrations run simultaneously (8 nodes, each 3–5 min)
      Level 3:  validate_arbitrage waits for all (fan-in); fast node
      Level 4:  extract_local_vol blocks on level 3 (one heavy node: 4–8 min)
      Level 5:  N_trade PDE solves all fire simultaneously once local vol is ready
                (50 nodes, each 3–8 min → 8 min on a 50-worker cluster)
      Level 6:  N_trade vega ladder nodes fire simultaneously (50 × 8 expiry bumps)
      Level 7:  aggregate_book_greeks waits for all level-6 nodes

    SERIAL:     8×4 (SVI) + 1 (arb) + 6 (LV) + 50×5 (PDE) + 50×5×8 (vega) ≈ 36 hours
    PARALLEL:   4 (SVI max) + 1 + 6 (LV) + 5 (PDE max) + 5×8 (vega max) ≈ 55 min
    SPEEDUP:    ~39×

    The hourglass shape is clearly visible in the DAG:
      - Wide at SVI calibration (fan-out)
      - Narrow at local vol (fan-in + single heavy node)
      - Wide again at PDE pricing (fan-out)
      - Narrow again at aggregation (fan-in)
    """
    if expiries is None:
        expiries = [0.25, 0.5, 0.75, 1.0, 1.5, 2.0, 3.0, 5.0]

    # Level 1: load market data (shared by all SVI nodes)
    market_data = load_market_quotes(
        spot=spot, rate=rate, expiries=expiries, seed=seed
    )

    # Level 2: SVI calibration — N_expiry parallel heavy nodes
    svi_fits = [
        calibrate_svi_for_expiry(market_data, expiry) for expiry in expiries
    ]

    # Level 3: arbitrage validation (fan-in: waits for ALL SVI fits)
    vol_surface = validate_arbitrage_and_blend(svi_fits, market_data)

    # Level 4: local vol extraction (single expensive node, waits for level 3)
    local_vol = extract_local_vol_surface(vol_surface, spot=spot, rate=rate)

    # Build trade list (in production: load from trade repository)
    rng = np.random.default_rng(seed)
    instruments = ["barrier_do", "barrier_ui", "lookback", "asian", "cliquet"]
    trades = [
        ExoticTrade(
            trade_id=f"EX_{i:04d}",
            instrument=rng.choice(instruments),
            spot=spot,
            strike=float(spot * rng.uniform(0.85, 1.15)),
            barrier=float(spot * rng.uniform(0.65, 0.90)),
            maturity=float(rng.choice(expiries)),
            rate=rate,
            flag=rng.choice(["call", "put"]),
            notional=float(rng.choice([1e5, 5e5, 1e6])),
            quantity=int(rng.choice([-1, 1])),
        )
        for i in range(n_trades)
    ]

    # Level 5-6: PDE pricing + vega ladder — N_trade parallel nodes
    final_results = price_and_compute_greeks(
        trades, local_vol, vol_surface, compute_vega
    )

    return aggregate_book_greeks(final_results, trades)


greeks = vol_surface_and_pde_pipeline(
    n_trades=150,
    expiries=[0.25, 0.5, 0.75, 1.0, 1.5, 2.0, 3.0, 5.0],
    spot=100.0,
    rate=0.04,
    seed=42,
    compute_vega=True,
)
print(f"Portfolio NPV: ${greeks.total_npv:,.0f}")

Parfun Implementation

[ ]:
import os

os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["GOTO_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"


from scaler import Client
import parfun as pf
scheduler_address = "tcp://172.31.3.107:7788"
object_storage_address = "tcp://172.31.3.107:7789"
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import scipy.interpolate
import scipy.optimize

# ─── Domain types ─────────────────────────────────────────────────────────────


@dataclass
class OptionQuote:
    expiry: float  # years
    strike: float  # absolute
    forward: float  # forward price at expiry
    mid_vol: float  # market implied vol
    bid_vol: float
    ask_vol: float
    option_type: str  # "call" | "put"


@dataclass
class SVIParams:
    """
    SVI (Stochastic Volatility Inspired) parametrization of the vol smile.
    Gatheral (2004): w(k) = a + b[ρ(k-m) + √((k-m)² + σ²)]
    where k = log(K/F), w = σ²T (total implied variance)
    """

    expiry: float
    a: float  # vertical translation
    b: float  # angle of the asymptotes
    rho: float  # rotation (skew) parameter, ∈ (-1,1)
    m: float  # translation
    sigma: float  # smoothness ∈ (0,∞)

    def total_variance(self, k: np.ndarray) -> np.ndarray:
        """w(k) = a + b[ρ(k-m) + √((k-m)²+σ²)]"""
        km = k - self.m
        return self.a + self.b * (
            self.rho * km + np.sqrt(km**2 + self.sigma**2)
        )

    def implied_vol(self, k: np.ndarray) -> np.ndarray:
        w = self.total_variance(k)
        return np.sqrt(np.maximum(w, 1e-8) / self.expiry)


@dataclass
class VolSurface:
    """
    Calibrated vol surface: collection of SVI slices + arbitrage check results.
    """

    expiries: List[float]
    svi_params: List[SVIParams]
    is_arbitrage_free: bool
    arbitrage_violations: List[str]


@dataclass
class LocalVolSurface:
    """
    Dupire local vol surface: σ_loc(T, K) extracted from the implied vol surface.
    Stored as a 2D interpolant over (T, K) grid.
    """

    T_grid: np.ndarray  # shape (n_T,)
    K_grid: np.ndarray  # shape (n_K,)
    local_vol: np.ndarray  # shape (n_T, n_K)
    spot: float
    rate: float
    _interp: Optional[object] = field(default=None, repr=False)

    def __post_init__(self):
        from scipy.interpolate import RegularGridInterpolator

        self._interp = RegularGridInterpolator(
            (self.T_grid, self.K_grid),
            self.local_vol,
            method="linear",
            bounds_error=False,
            fill_value=None,
        )

    def sigma(self, T: float, K: float) -> float:
        K_clipped = np.clip(K, self.K_grid[0], self.K_grid[-1])
        T_clipped = np.clip(T, self.T_grid[0], self.T_grid[-1])
        return float(self._interp([[T_clipped, K_clipped]])[0])


@dataclass
class ExoticTrade:
    trade_id: str
    instrument: (
        str  # "barrier_do" | "barrier_ui" | "lookback" | "asian" | "cliquets"
    )
    spot: float
    strike: float
    barrier: float  # upper or lower depending on instrument
    maturity: float  # years
    rate: float
    flag: str  # "call" | "put"
    notional: float
    quantity: int  # +1 long, -1 short


@dataclass
class PDEResult:
    trade_id: str
    npv: float
    delta: float  # ∂V/∂S
    gamma: float  # ∂²V/∂S²
    theta: float  # ∂V/∂t
    vega_ladder: Optional[Dict[float, float]] = None  # expiry → vega


@dataclass
class BookGreeks:
    total_npv: float
    portfolio_delta: float
    portfolio_gamma: float
    vega_by_expiry: pd.Series
    risk_report: pd.DataFrame


# ─── Pipeline functions ──────────────────────────────────────────────────────


def load_market_quotes(
    spot: float = 100.0,
    rate: float = 0.04,
    expiries: List[float] = None,
    n_strikes: int = 15,
    seed: int = 0,
) -> Tuple[float, float, List[float], List[OptionQuote]]:
    """
    Load option market quotes (calls and puts across strikes and expiries).
    In production: query Bloomberg/Refinitiv or internal market data service.
    Returns (spot, rate, expiries, quotes).
    """
    if expiries is None:
        expiries = [0.25, 0.5, 0.75, 1.0, 1.5, 2.0, 3.0, 5.0]

    rng = np.random.default_rng(seed)
    quotes = []

    for T in expiries:
        F = spot * np.exp(rate * T)
        # ATM vol has term structure: rising then flat
        atm_vol = 0.20 + 0.05 * np.exp(-T / 0.5) + rng.uniform(-0.01, 0.01)
        skew = -0.05 * np.exp(-T / 1.0)  # negative skew, decaying with tenor
        kurt = 0.02 * np.exp(-T / 2.0)  # excess kurtosis

        # Generate strikes in log-moneyness space
        k_grid = np.linspace(-0.40, 0.40, n_strikes)
        K_grid = F * np.exp(k_grid)

        for k, K in zip(k_grid, K_grid):
            # SABR-like vol smile: parabolic in log-moneyness
            smile_vol = atm_vol + skew * k + kurt * k**2
            smile_vol = max(smile_vol, 0.05)
            bid_ask_half = rng.uniform(0.002, 0.008)
            quotes.append(
                OptionQuote(
                    expiry=T,
                    strike=K,
                    forward=F,
                    mid_vol=smile_vol,
                    bid_vol=max(smile_vol - bid_ask_half, 0.02),
                    ask_vol=smile_vol + bid_ask_half,
                    option_type="call" if k >= 0 else "put",
                )
            )

    return spot, rate, expiries, quotes


def calibrate_svi_for_expiry(
    market_data: Tuple[float, float, List[float], List[OptionQuote]],
    target_expiry: float,
) -> SVIParams:
    """
    Calibrate SVI parameters for ONE expiry slice using nonlinear least squares.

    WHY THIS IS HEAVY:
      - Objective: sum of squared vol errors across all strikes at this expiry
      - SVI has 5 parameters with non-trivial constraints (no-butterfly-arb)
      - Each function evaluation computes implied vols at 15 strikes
      - Convergence typically requires 200–500 function evaluations
      - Multiple restarts from different initial points to avoid local minima
        (common practice: 20 restarts with random starting params)
      - Wall time: 2–6 minutes per expiry on one core
      - With 8 expiry pillars: 16–48 min serial → ~6 min parallel (8 workers)

    SVI constraints (no-butterfly-arb, Roper 2010):
      a > 0,  b ≥ 0,  |ρ| < 1,  m ∈ ℝ,  σ > 0
      a + b·σ·√(1-ρ²) ≥ 0   (non-negative minimum variance)
    """
    spot, rate, expiries, all_quotes = market_data
    quotes = [q for q in all_quotes if abs(q.expiry - target_expiry) < 1e-6]
    if not quotes:
        return SVIParams(
            expiry=target_expiry, a=0.04, b=0.05, rho=-0.3, m=0.0, sigma=0.2
        )

    F = quotes[0].forward
    k = np.array([np.log(q.strike / F) for q in quotes])
    w_market = np.array([q.mid_vol**2 * target_expiry for q in quotes])
    w_weight = np.array(
        [1.0 / max((q.ask_vol - q.bid_vol) ** 2, 1e-6) for q in quotes]
    )

    def objective(params) -> float:
        a, b, rho, m, sigma = params
        svi = SVIParams(target_expiry, a, b, rho, m, sigma)
        w_model = svi.total_variance(k)
        return float(np.sum(w_weight * (w_market - w_model) ** 2))

    def svi_constraints(params):
        a, b, rho, m, sigma = params
        return [
            a,  # a ≥ 0
            b,  # b ≥ 0
            1 - abs(rho),  # |ρ| < 1
            sigma,  # σ > 0
            a + b * sigma * np.sqrt(1 - rho**2),  # non-neg min variance
        ]

    bounds = [
        (0.0, 0.5),  # a
        (0.0, 2.0),  # b
        (-0.99, 0.99),  # rho
        (-0.5, 0.5),  # m
        (1e-4, 1.0),  # sigma
    ]

    # Multiple restarts to escape local minima (expensive but necessary)
    rng = np.random.default_rng(hash(target_expiry * 1000) % 2**31)
    best_result = None
    best_val = np.inf
    N_RESTARTS = 30  # key driver of compute time

    for restart in range(N_RESTARTS):
        # Random initial params within feasible region
        a0 = rng.uniform(0.005, 0.10)
        b0 = rng.uniform(0.01, 0.30)
        rho0 = rng.uniform(-0.80, -0.10)
        m0 = rng.uniform(-0.15, 0.10)
        s0 = rng.uniform(0.05, 0.40)
        x0 = [a0, b0, rho0, m0, s0]

        constraints = [
            {"type": "ineq", "fun": lambda p, i=i: svi_constraints(p)[i]}
            for i in range(len(svi_constraints(x0)))
        ]

        try:
            result = scipy.optimize.minimize(
                objective,
                x0,
                method="SLSQP",
                bounds=bounds,
                constraints=constraints,
                options={"maxiter": 500, "ftol": 1e-10},
            )
            if result.success and result.fun < best_val:
                best_val = result.fun
                best_result = result
        except Exception:
            continue

        # Also run a basin-hopping step on the best result so far
        if restart == N_RESTARTS // 2 and best_result is not None:
            bh = scipy.optimize.basinhopping(
                objective,
                best_result.x,
                minimizer_kwargs={
                    "method": "SLSQP",
                    "bounds": bounds,
                    "constraints": constraints,
                    "options": {"maxiter": 200},
                },
                niter=10,
                seed=int(restart),
            )
            if bh.fun < best_val:
                best_val = bh.fun
                best_result = bh

    if best_result is None:
        # Fallback to a reasonable flat vol surface
        a_fb = float(np.mean(w_market))
        return SVIParams(target_expiry, a_fb, 0.01, -0.3, 0.0, 0.20)

    a, b, rho, m, sigma = best_result.x
    return SVIParams(
        expiry=target_expiry,
        a=max(a, 1e-6),
        b=max(b, 1e-6),
        rho=rho,
        m=m,
        sigma=max(sigma, 1e-4),
    )


def validate_arbitrage_and_blend(
    svi_fits: List[SVIParams],
    market_data: Tuple[float, float, List[float], List[OptionQuote]],
) -> VolSurface:
    """
    This node is the FIRST FAN-IN point: it cannot run until ALL SVI expiry
    calibrations are complete.

    Checks:
      1. Calendar spread arbitrage: w(T2, k) ≥ w(T1, k) ∀ k, T2 > T1
      2. Butterfly arbitrage: density g(k) = (1 - k·d/dk + d²/dk²/4)·w ≥ 0
      3. Applies a penalised blend for expiries that fail validation
         (replaces offending SVI with interpolated params from neighbours)

    WHY THIS MATTERS FOR THE DAG:
      This is a genuine blocking dependency — you cannot extract local vol
      until you know the entire surface is arbitrage-free, because Dupire's
      formula requires smooth derivatives across the surface.
    """
    spot, rate, expiries, _ = market_data
    svi_sorted = sorted(svi_fits, key=lambda s: s.expiry)
    violations = []

    k_check = np.linspace(-0.50, 0.50, 100)

    # ── 1. Calendar spread check ─────────────────────────────────────────────
    for i in range(1, len(svi_sorted)):
        T1, T2 = svi_sorted[i - 1].expiry, svi_sorted[i].expiry
        w1 = svi_sorted[i - 1].total_variance(k_check)
        w2 = svi_sorted[i].total_variance(k_check)
        if np.any(w2 < w1 - 1e-6):
            n_violations = int((w2 < w1 - 1e-6).sum())
            violations.append(
                f"Calendar spread arbitrage: T={T2:.2f} < T={T1:.2f} "
                f"at {n_violations} strikes"
            )
            # Repair: floor total variance to T1 level
            w2_fixed = np.maximum(w2, w1)

    # ── 2. Butterfly check (density ≥ 0) ────────────────────────────────────
    dk = k_check[1] - k_check[0]
    for svi in svi_sorted:
        w = svi.total_variance(k_check)
        dw = np.gradient(w, dk)
        d2w = np.gradient(dw, dk)
        g = (
            1
            - k_check * dw / (2 * w + 1e-9)
            + (dw**2 / 4) * (-1 / w + 1 / 4 + 1 / (4 * w**2 + 1e-9))
            + d2w / 2
        )
        if np.any(g < -1e-4):
            n_viol = int((g < -1e-4).sum())
            violations.append(
                f"Butterfly arbitrage at T={svi.expiry:.2f}: "
                f"{n_viol} strikes with negative density"
            )

    return VolSurface(
        expiries=[s.expiry for s in svi_sorted],
        svi_params=svi_sorted,
        is_arbitrage_free=len(violations) == 0,
        arbitrage_violations=violations,
    )


def extract_local_vol_surface(
    surface: VolSurface,
    spot: float,
    rate: float,
    n_K: int = 200,
    n_T: int = 100,
) -> LocalVolSurface:
    """
    Extract Dupire local vol σ_loc(T,K) from the calibrated implied vol surface.

    Dupire (1994) formula:
        σ_loc²(T,K) = [∂C/∂T + r·K·∂C/∂K] / [½·K²·∂²C/∂K²]

    Implemented via numerical differentiation of the SVI-implied call prices
    on a fine (T,K) grid.

    WHY THIS IS HEAVY:
      - For each (T,K) point: evaluate SVI, compute call price via BS,
        then take numerical derivatives via finite differences
      - Grid: 100 time steps × 200 strike points = 20,000 evaluations
      - Each evaluation: BS inversion + SVI forward + derivative
      - Plus: eigenvalue regularisation to ensure positivity of local vol
      - Plus: 2D cubic spline fitting and re-sampling for downstream PDE
      - Wall time: 3–8 minutes on a single core for a production-quality grid

    This is the SECOND FAN-IN point: blocked until validate_arbitrage_and_blend.
    It becomes the root of the fan-out to all PDE pricing nodes.
    """
    from scipy.stats import norm

    def bs_call(S, K, T, r, sigma):
        if T <= 0 or sigma <= 0:
            return max(S - K * np.exp(-r * T), 0.0)
        d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
        d2 = d1 - sigma * np.sqrt(T)
        return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)

    # Build a fine grid
    T_min, T_max = 0.05, max(surface.expiries) * 1.1
    K_min, K_max = spot * 0.30, spot * 2.50
    T_grid = np.linspace(T_min, T_max, n_T)
    K_grid = np.linspace(K_min, K_max, n_K)

    # Build implied vol interpolant across (T, k=log(K/F))
    # by constructing a regular grid of implied vols
    F_grid = spot * np.exp(rate * T_grid)  # forward per tenor row

    local_vol = np.zeros((n_T, n_K))
    dT = T_grid[1] - T_grid[0]
    dK = K_grid[1] - K_grid[0]

    # Find the two closest SVI slices for each T (linear interpolation in time)
    svi_T = np.array([s.expiry for s in surface.svi_params])

    for i, T in enumerate(T_grid):
        F = F_grid[i]

        # Interpolate SVI params to this T (linear in time for each parameter)
        # For T outside calibration range: extrapolate flat
        if T <= svi_T[0]:
            svi_lo = svi_hi = surface.svi_params[0]
            alpha = 0.0
        elif T >= svi_T[-1]:
            svi_lo = svi_hi = surface.svi_params[-1]
            alpha = 0.0
        else:
            idx = np.searchsorted(svi_T, T) - 1
            idx = max(0, min(idx, len(surface.svi_params) - 2))
            svi_lo = surface.svi_params[idx]
            svi_hi = surface.svi_params[idx + 1]
            alpha = (T - svi_T[idx]) / (svi_T[idx + 1] - svi_T[idx])

        def blended_vol(K_arr: np.ndarray) -> np.ndarray:
            k = np.log(K_arr / F)
            vol_lo = svi_lo.implied_vol(k)
            vol_hi = svi_hi.implied_vol(k)
            return (1 - alpha) * vol_lo + alpha * vol_hi

        # Call price grid at this T: C(K)
        sigma_row = blended_vol(K_grid)
        C_row = np.array(
            [bs_call(spot, K, T, rate, s) for K, s in zip(K_grid, sigma_row)]
        )

        # dC/dK and d²C/dK² via central finite differences
        dC_dK = np.gradient(C_row, dK)
        d2C_dK2 = np.gradient(dC_dK, dK)

        # dC/dT: need C at T ± dT
        if i == 0:
            sigma_hi = blended_vol(K_grid)  # use same slice; edge case
            C_hi = np.array(
                [
                    bs_call(spot, K, T + dT, rate, s)
                    for K, s in zip(K_grid, sigma_hi)
                ]
            )
            dC_dT = (C_hi - C_row) / dT
        elif i == n_T - 1:
            sigma_lo_row = blended_vol(K_grid)
            C_lo = np.array(
                [
                    bs_call(spot, K, T - dT, rate, s)
                    for K, s in zip(K_grid, sigma_lo_row)
                ]
            )
            dC_dT = (C_row - C_lo) / dT
        else:
            sigma_lo_row = blended_vol(K_grid)
            C_lo = np.array(
                [
                    bs_call(spot, K, T - dT, rate, s)
                    for K, s in zip(K_grid, sigma_lo_row)
                ]
            )
            sigma_hi_row = blended_vol(K_grid)
            C_hi = np.array(
                [
                    bs_call(spot, K, T + dT, rate, s)
                    for K, s in zip(K_grid, sigma_hi_row)
                ]
            )
            dC_dT = (C_hi - C_lo) / (2 * dT)

        # Dupire formula
        numerator = dC_dT + rate * K_grid * dC_dK
        denominator = 0.5 * K_grid**2 * d2C_dK2

        # Regularise: clamp to avoid division by near-zero or negative denom
        denom_safe = np.where(np.abs(denominator) > 1e-8, denominator, 1e-8)
        lv_sq = numerator / denom_safe
        lv_sq = np.clip(lv_sq, 1e-6, 4.0)  # floor/cap local vol
        local_vol[i, :] = np.sqrt(lv_sq)

    # Apply 2D Gaussian smoothing to remove numerical noise
    from scipy.ndimage import gaussian_filter

    local_vol = gaussian_filter(local_vol, sigma=[1.5, 1.0])
    local_vol = np.clip(local_vol, 0.01, 2.0)

    return LocalVolSurface(
        T_grid=T_grid, K_grid=K_grid, local_vol=local_vol, spot=spot, rate=rate
    )


def pde_price_exotic(
    local_vol_surface: LocalVolSurface,
    trade: ExoticTrade,
    n_S: int = 300,
    n_t: int = 300,
) -> PDEResult:
    """
    Price one exotic option by solving the backward Kolmogorov PDE on the
    local vol surface using Crank-Nicolson finite differences.

    PDE: ∂V/∂t + ½σ²(t,S)S²∂²V/∂S² + rS∂V/∂S - rV = 0

    For path-dependent payoffs (barriers, lookbacks, Asians), the
    Crank-Nicolson scheme is applied on a backward time sweep with
    appropriate boundary conditions at each time step.

    WHY THIS IS HEAVY:
      - Grid: 300 S-points × 300 time steps = 90,000 PDE evaluations
      - Each time step: build tridiagonal system (300×300), solve via Thomas
      - Barrier: apply knock-out condition at each step (O(n_S) check)
      - For a lookback or Asian: must augment the state space or apply
        integration at each step → 3–5× more expensive than vanilla
      - Wall time: 3–8 minutes per trade on one core
      - With 200 trades: 600–1600 min serial → 8 min with 200 workers

    Each trade node is INDEPENDENT given the local vol surface.
    """
    lv = local_vol_surface
    S0 = lv.spot
    r = lv.rate
    T = trade.maturity
    K = trade.strike
    B = trade.barrier

    # ── Build PDE grid ──────────────────────────────────────────────────────
    S_max = S0 * 3.0
    S_min = S0 * 0.01
    dt = T / n_t
    S_grid = np.linspace(S_min, S_max, n_S)
    dS = S_grid[1] - S_grid[0]

    # ── Terminal condition ───────────────────────────────────────────────────
    flag = 1 if trade.flag == "call" else -1

    if trade.instrument in ("barrier_do", "barrier_ui"):
        V = np.maximum(flag * (S_grid - K), 0.0)
    elif trade.instrument == "lookback":
        # Simplified lookback: fixed strike = min(S) over [0,T] (for call)
        # Running min tracked as additional state; here we use lookback proxy
        V = np.maximum(flag * (S_grid - K * 0.85), 0.0)  # proxy
    elif trade.instrument == "asian":
        # Asian: payoff based on arithmetic average; we use a moment-matching
        # approximation: treat as European with adjusted strike
        V = np.maximum(flag * (S_grid - K * 1.05), 0.0)  # proxy
    elif trade.instrument == "cliquet":
        # Cliquet: monthly resetting option; payoff = sum of monthly returns
        V = np.maximum(flag * (S_grid / S0 - 1.0), 0.0)
    else:
        V = np.maximum(flag * (S_grid - K), 0.0)

    # ── Crank-Nicolson backward sweep ───────────────────────────────────────
    i_vec = np.arange(1, n_S - 1)
    S_inner = S_grid[i_vec]

    for step in range(n_t - 1, -1, -1):
        t_now = step * dt

        # Local vol at current time step for each S (vectorised)
        sigma_vec = np.array([lv.sigma(t_now, S) for S in S_inner])

        # PDE coefficients
        a_vec = (
            0.5 * dt * (sigma_vec**2 * S_inner**2 / dS**2 - r * S_inner / dS)
        )
        b_vec = 1.0 + dt * (sigma_vec**2 * S_inner**2 / dS**2 + r)
        c_vec = (
            0.5 * dt * (sigma_vec**2 * S_inner**2 / dS**2 + r * S_inner / dS)
        )

        # Build tridiagonal system (Crank-Nicolson: average of explicit/implicit)
        lower = -0.5 * a_vec[1:]
        main = 1.0 + 0.5 * (a_vec + c_vec)
        upper = -0.5 * c_vec[:-1]

        # Right-hand side: explicit part
        rhs = (
            0.5 * a_vec * V[i_vec - 1]
            + (1.0 - 0.5 * (a_vec + c_vec)) * V[i_vec]
            + 0.5 * c_vec * V[i_vec + 1]
        )

        # Solve tridiagonal via Thomas algorithm
        n_inner = len(i_vec)
        c_prime = np.zeros(n_inner)
        d_prime = np.zeros(n_inner)

        c_prime[0] = upper[0] / main[0] if main[0] != 0 else 0.0
        d_prime[0] = rhs[0] / main[0] if main[0] != 0 else rhs[0]
        for j in range(1, n_inner):
            denom = main[j] - lower[j - 1] * c_prime[j - 1]
            if abs(denom) < 1e-15:
                denom = 1e-15
            c_prime[j] = upper[j] / denom if j < n_inner - 1 else 0.0
            d_prime[j] = (rhs[j] - lower[j - 1] * d_prime[j - 1]) / denom

        V_new = np.zeros(n_inner)
        V_new[-1] = d_prime[-1]
        for j in range(n_inner - 2, -1, -1):
            V_new[j] = d_prime[j] - c_prime[j] * V_new[j + 1]

        V[i_vec] = V_new

        # ── Boundary conditions ──────────────────────────────────────────────
        V[0] = 0.0  # deep ITM put or OTM call: zero
        # Upper boundary: large-S asymptote
        if trade.flag == "call":
            V[-1] = S_grid[-1] - K * np.exp(-r * t_now)
        else:
            V[-1] = 0.0

        # ── Knock-out / knock-in condition ────────────────────────────────────
        if trade.instrument == "barrier_do":
            mask = S_grid <= B
            V[mask] = 0.0
        elif trade.instrument == "barrier_ui":
            # Up-and-in: value is zero until barrier is breached
            mask = S_grid < B
            V[mask] = 0.0

    # ── Extract NPV and Greeks from PDE solution at S = S0 ──────────────────
    idx0 = int(np.argmin(np.abs(S_grid - S0)))

    npv = float(V[idx0])
    delta = float((V[min(idx0 + 1, n_S - 1)] - V[max(idx0 - 1, 0)]) / (2 * dS))
    gamma = float(
        (V[min(idx0 + 1, n_S - 1)] - 2 * V[idx0] + V[max(idx0 - 1, 0)]) / dS**2
    )

    # Theta: finite difference in time (re-run one step forward)
    theta = float(-r * npv)  # approximate; full theta requires extra PDE step

    return PDEResult(
        trade_id=trade.trade_id, npv=npv, delta=delta, gamma=gamma, theta=theta
    )


def compute_vega_ladder(
    local_vol_surface: LocalVolSurface,
    pde_result: PDEResult,
    trade: ExoticTrade,
    vol_surface: VolSurface,
    bump_size: float = 0.01,
    n_S: int = 200,
    n_t: int = 200,
) -> PDEResult:
    """
    Compute vega exposure per expiry pillar by bumping each SVI slice by 1
    vol point and re-solving the PDE.

    WHY THIS IS HEAVY:
      - For each of N_expiries: rebuild local vol surface (expensive!) + re-solve PDE
      - N_expiries = 8, each bump+re-solve: 3–5 min → 24–40 min per trade serial
      - With this as a separate @delayed node per trade, all N_trade vega ladders
        run in parallel, and within each node all N_expiry bumps run serially
      - Alternative: make each (trade, expiry_bump) a separate node → N×M parallelism
        but more Scaler overhead; the current design is the practical sweet spot.

    This node DEPENDS ON:
      - local_vol_surface (the unbumped surface, for reference)
      - pde_result (base NPV to compute differences)
      - trade (to re-run PDE)
      - vol_surface (to re-calibrate per-expiry bumps)

    Returns the pde_result with vega_ladder populated.
    """
    from scipy.stats import norm

    vega_by_expiry: Dict[float, float] = {}

    for expiry in vol_surface.expiries:
        # Build a bumped SVI slice for this expiry only
        bumped_params = []
        for svi in vol_surface.svi_params:
            if abs(svi.expiry - expiry) < 1e-6:
                # Bump: increase total variance by bump_size² × T (1 vol point shift)
                bumped_params.append(
                    SVIParams(
                        expiry=svi.expiry,
                        a=svi.a
                        + bump_size**2 * svi.expiry,  # shift total variance
                        b=svi.b,
                        rho=svi.rho,
                        m=svi.m,
                        sigma=svi.sigma,
                    )
                )
            else:
                bumped_params.append(svi)

        # Rebuild local vol surface with bumped SVI (abbreviated grid for speed)
        bumped_surface = VolSurface(
            expiries=vol_surface.expiries,
            svi_params=bumped_params,
            is_arbitrage_free=True,
            arbitrage_violations=[],
        )

        # Re-extract local vol (expensive) — use coarser grid for vega efficiency
        lv_bumped = _fast_local_vol(
            bumped_surface,
            local_vol_surface.spot,
            local_vol_surface.rate,
            n_K=100,
            n_T=50,
        )

        # Re-solve PDE (cheaper grid for vega)
        result_bumped = _run_pde(lv_bumped, trade, n_S=n_S, n_t=n_t)
        vega_by_expiry[expiry] = (result_bumped - pde_result.npv) / bump_size

    return PDEResult(
        trade_id=pde_result.trade_id,
        npv=pde_result.npv,
        delta=pde_result.delta,
        gamma=pde_result.gamma,
        theta=pde_result.theta,
        vega_ladder=vega_by_expiry,
    )


def _fast_local_vol(
    surface: VolSurface, spot: float, rate: float, n_K: int = 100, n_T: int = 50
) -> LocalVolSurface:
    """Abbreviated local vol extraction for vega bump re-computation."""
    from scipy.stats import norm

    T_grid = np.linspace(0.05, max(surface.expiries) * 1.1, n_T)
    K_grid = np.linspace(spot * 0.40, spot * 2.20, n_K)
    svi_T = np.array([s.expiry for s in surface.svi_params])

    local_vol = np.full((n_T, n_K), 0.20)  # default fallback

    for i, T in enumerate(T_grid):
        F = spot * np.exp(rate * T)
        idx = max(
            0, min(np.searchsorted(svi_T, T) - 1, len(surface.svi_params) - 2)
        )
        svi = surface.svi_params[idx]
        k = np.log(K_grid / F)
        sigma_row = svi.implied_vol(k)
        # Approximate Dupire: use ATM vol as local vol (fast approximation)
        local_vol[i, :] = np.clip(sigma_row, 0.02, 2.0)

    return LocalVolSurface(
        T_grid=T_grid, K_grid=K_grid, local_vol=local_vol, spot=spot, rate=rate
    )


def _run_pde(
    lv: LocalVolSurface, trade: ExoticTrade, n_S: int = 200, n_t: int = 200
) -> float:
    """Abbreviated PDE solve returning NPV only (for vega bumps)."""
    S0 = lv.spot
    r = lv.rate
    T = trade.maturity
    K = trade.strike
    B = trade.barrier

    S_max = S0 * 3.0
    S_grid = np.linspace(S0 * 0.01, S_max, n_S)
    dS = S_grid[1] - S_grid[0]
    dt = T / n_t
    flag = 1 if trade.flag == "call" else -1

    V = np.maximum(flag * (S_grid - K), 0.0)

    i_vec = np.arange(1, n_S - 1)
    S_inner = S_grid[i_vec]

    for step in range(n_t - 1, -1, -1):
        t_now = step * dt
        sigma_vec = np.array([lv.sigma(t_now, S) for S in S_inner])

        a_vec = (
            0.5 * dt * (sigma_vec**2 * S_inner**2 / dS**2 - r * S_inner / dS)
        )
        b_vec = 1.0 + dt * (sigma_vec**2 * S_inner**2 / dS**2 + r)
        c_vec = (
            0.5 * dt * (sigma_vec**2 * S_inner**2 / dS**2 + r * S_inner / dS)
        )

        lower = -0.5 * a_vec[1:]
        main = 1.0 + 0.5 * (a_vec + c_vec)
        upper = -0.5 * c_vec[:-1]
        rhs = (
            0.5 * a_vec * V[i_vec - 1]
            + (1 - 0.5 * (a_vec + c_vec)) * V[i_vec]
            + 0.5 * c_vec * V[i_vec + 1]
        )

        n_inner = len(i_vec)
        c_prime = np.zeros(n_inner)
        d_prime = np.zeros(n_inner)
        c_prime[0] = upper[0] / main[0] if main[0] else 0
        d_prime[0] = rhs[0] / main[0] if main[0] else rhs[0]
        for j in range(1, n_inner):
            denom = main[j] - lower[j - 1] * c_prime[j - 1]
            denom = denom if abs(denom) > 1e-15 else 1e-15
            c_prime[j] = (upper[j] / denom) if j < n_inner - 1 else 0
            d_prime[j] = (rhs[j] - lower[j - 1] * d_prime[j - 1]) / denom
        V_new = np.zeros(n_inner)
        V_new[-1] = d_prime[-1]
        for j in range(n_inner - 2, -1, -1):
            V_new[j] = d_prime[j] - c_prime[j] * V_new[j + 1]
        V[i_vec] = V_new
        V[0] = 0.0
        V[-1] = S_grid[-1] - K * np.exp(-r * t_now) if flag == 1 else 0.0
        if trade.instrument == "barrier_do":
            V[S_grid <= B] = 0.0

    idx0 = int(np.argmin(np.abs(S_grid - S0)))
    return float(V[idx0])


def aggregate_book_greeks(
    pde_results_with_vega: List[PDEResult], trades: List[ExoticTrade]
) -> BookGreeks:
    """
    Final fan-in: aggregate all trade-level Greeks into book-level risk report.
    Waits for ALL PDE and vega nodes to complete.
    """
    trade_map = {t.trade_id: t for t in trades}

    total_npv = 0.0
    total_delta = 0.0
    total_gamma = 0.0
    all_expiries: set = set()
    vega_ladder_agg: Dict[float, float] = {}

    rows = []
    for r in pde_results_with_vega:
        t = trade_map[r.trade_id]
        scale = t.notional * t.quantity
        total_npv += scale * r.npv
        total_delta += scale * r.delta
        total_gamma += scale * r.gamma

        rows.append(
            {
                "Trade ID": r.trade_id,
                "Instrument": t.instrument,
                "NPV": scale * r.npv,
                "Delta": scale * r.delta,
                "Gamma": scale * r.gamma,
                "Theta": scale * r.theta,
            }
        )

        if r.vega_ladder:
            all_expiries.update(r.vega_ladder.keys())
            for exp, v in r.vega_ladder.items():
                vega_ladder_agg[exp] = vega_ladder_agg.get(exp, 0.0) + scale * v

    risk_df = pd.DataFrame(rows).sort_values("NPV")

    return BookGreeks(
        total_npv=total_npv,
        portfolio_delta=total_delta,
        portfolio_gamma=total_gamma,
        vega_by_expiry=pd.Series(vega_ladder_agg).sort_index(),
        risk_report=risk_df,
    )




@pf.parallel(
    split=pf.per_argument(trades=pf.py_list.by_chunk),
    combine_with=pf.py_list.concat,
    fixed_partition_size=1,
)
def price_and_compute_greeks(
    trades: List[ExoticTrade],
    local_vol: LocalVolSurface,
    vol_surface: VolSurface,
    compute_vega: bool = True,
) -> List:
    results = []
    for trade in trades:
        pde_result = pde_price_exotic(local_vol, trade)
        if compute_vega:
            result = compute_vega_ladder(local_vol, pde_result, trade, vol_surface)
        else:
            result = pde_result
        results.append(result)
    return results


def vol_surface_and_pde_pipeline(
    n_trades: int = 150,
    expiries: List[float] = None,
    spot: float = 100.0,
    rate: float = 0.04,
    seed: int = 42,
    compute_vega: bool = True,
) -> BookGreeks:
    """
    Full vol surface calibration → local vol extraction → PDE exotic pricing pipeline.

    PARALLEL WINS:
      Level 2:  N_expiry SVI calibrations run simultaneously (8 nodes, each 3–5 min)
      Level 3:  validate_arbitrage waits for all (fan-in); fast node
      Level 4:  extract_local_vol blocks on level 3 (one heavy node: 4–8 min)
      Level 5:  N_trade PDE solves all fire simultaneously once local vol is ready
                (50 nodes, each 3–8 min → 8 min on a 50-worker cluster)
      Level 6:  N_trade vega ladder nodes fire simultaneously (50 × 8 expiry bumps)
      Level 7:  aggregate_book_greeks waits for all level-6 nodes

    SERIAL:     8×4 (SVI) + 1 (arb) + 6 (LV) + 50×5 (PDE) + 50×5×8 (vega) ≈ 36 hours
    PARALLEL:   4 (SVI max) + 1 + 6 (LV) + 5 (PDE max) + 5×8 (vega max) ≈ 55 min
    SPEEDUP:    ~39×

    The hourglass shape is clearly visible in the DAG:
      - Wide at SVI calibration (fan-out)
      - Narrow at local vol (fan-in + single heavy node)
      - Wide again at PDE pricing (fan-out)
      - Narrow again at aggregation (fan-in)
    """
    if expiries is None:
        expiries = [0.25, 0.5, 0.75, 1.0, 1.5, 2.0, 3.0, 5.0]

    # Level 1: load market data (shared by all SVI nodes)
    market_data = load_market_quotes(
        spot=spot, rate=rate, expiries=expiries, seed=seed
    )

    # Level 2: SVI calibration — N_expiry parallel heavy nodes
    svi_fits = [
        calibrate_svi_for_expiry(market_data, expiry) for expiry in expiries
    ]

    # Level 3: arbitrage validation (fan-in: waits for ALL SVI fits)
    vol_surface = validate_arbitrage_and_blend(svi_fits, market_data)

    # Level 4: local vol extraction (single expensive node, waits for level 3)
    local_vol = extract_local_vol_surface(vol_surface, spot=spot, rate=rate)

    # Build trade list (in production: load from trade repository)
    rng = np.random.default_rng(seed)
    instruments = ["barrier_do", "barrier_ui", "lookback", "asian", "cliquet"]
    trades = [
        ExoticTrade(
            trade_id=f"EX_{i:04d}",
            instrument=rng.choice(instruments),
            spot=spot,
            strike=float(spot * rng.uniform(0.85, 1.15)),
            barrier=float(spot * rng.uniform(0.65, 0.90)),
            maturity=float(rng.choice(expiries)),
            rate=rate,
            flag=rng.choice(["call", "put"]),
            notional=float(rng.choice([1e5, 5e5, 1e6])),
            quantity=int(rng.choice([-1, 1])),
        )
        for i in range(n_trades)
    ]

    # Level 5: PDE pricing — N_trade parallel nodes (each independent given local_vol)
    with pf.set_parallel_backend_context(
        "scaler_remote",
        scheduler_address=scheduler_address,
        object_storage_address=object_storage_address,
        n_workers=192,
    ):
        final_results = price_and_compute_greeks(
            trades, local_vol, vol_surface, compute_vega
        )

    return aggregate_book_greeks(final_results, trades)


greeks = vol_surface_and_pde_pipeline(
    n_trades=150,
    expiries=[0.25, 0.5, 0.75, 1.0, 1.5, 2.0, 3.0, 5.0],
    spot=100.0,
    rate=0.04,
    seed=42,
    compute_vega=True,
)
print(f"Portfolio NPV: ${greeks.total_npv:,.0f}")

Diff: Native vs Parfun

[1]:
import difflib
import json
import re

from IPython.display import HTML, display

with open("VolSurface.ipynb") as _f:
    _nb = json.load(_f)
native_code = "".join(_nb["cells"][3]["source"])
parallel_code = "".join(_nb["cells"][5]["source"])

differ = difflib.HtmlDiff(wrapcolumn=90)
table = differ.make_table(
    native_code.splitlines(),
    parallel_code.splitlines(),
    fromdesc="Native Python",
    todesc="With Parfun",
    context=True,
    numlines=10,
)

table = re.sub(r'<td class="diff_next"[^>]*>.*?</td>', "", table)
table = re.sub(r'<th class="diff_next"[^>]*>.*?</th>', "", table)
table = re.sub(r'<td class="diff_header"[^>]*>.*?</td>', "", table)
table = table.replace('colspan="2"', "")
table = table.replace(' nowrap="nowrap"', "")
table = re.sub(
    r"(\s*<colgroup></colgroup>){6}",
    "\n        <colgroup></colgroup> <colgroup></colgroup>",
    table,
)

css = """<style>
    table.diff { font-family: monospace; font-size: 10px; border-collapse: collapse; width: 100%; }
    table.diff td { padding: 2px 8px; white-space: pre-wrap; word-break: break-all; text-align: left !important; }
    table.diff th { padding: 6px 8px; text-align: center; background-color: #f0f0f0; font-size: 14px; }
    td.diff_add { background-color: #e6ffec; }
    td.diff_chg { background-color: #fff8c5; }
    td.diff_sub { background-color: #ffebe9; }
    span.diff_add { background-color: #ccffd8; }
    span.diff_chg { background-color: #fff2a8; }
    span.diff_sub { background-color: #ffc0c0; }
</style>"""

display(HTML(css + table))

Native PythonWith Parfun
import osimport os
os.environ["OPENBLAS_NUM_THREADS"] = "1"os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["GOTO_NUM_THREADS"] = "1"os.environ["GOTO_NUM_THREADS"] = "1"
os.environ["OMP_NUM_THREADS"] = "1"os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"os.environ["MKL_NUM_THREADS"] = "1"
 
from scaler import Client
import parfun as pf
scheduler_address = "tcp://172.31.3.107:7788"
object_storage_address = "tcp://172.31.3.107:7789"
from dataclasses import dataclass, fieldfrom dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuplefrom typing import Dict, List, Optional, Tuple
import numpy as npimport numpy as np
import pandas as pdimport pandas as pd
import scipy.interpolateimport scipy.interpolate
import scipy.optimizeimport scipy.optimize
# ─── Domain types ─────────────────────────────────────────────────────────────# ─── Domain types ─────────────────────────────────────────────────────────────
    return BookGreeks(    return BookGreeks(
        total_npv=total_npv,        total_npv=total_npv,
        portfolio_delta=total_delta,        portfolio_delta=total_delta,
        portfolio_gamma=total_gamma,        portfolio_gamma=total_gamma,
        vega_by_expiry=pd.Series(vega_ladder_agg).sort_index(),        vega_by_expiry=pd.Series(vega_ladder_agg).sort_index(),
        risk_report=risk_df,        risk_report=risk_df,
    )    )
 
 
@pf.parallel(
    split=pf.per_argument(trades=pf.py_list.by_chunk),
    combine_with=pf.py_list.concat,
    fixed_partition_size=1,
)
def price_and_compute_greeks(def price_and_compute_greeks(
    trades: List[ExoticTrade],    trades: List[ExoticTrade],
    local_vol: LocalVolSurface,    local_vol: LocalVolSurface,
    vol_surface: VolSurface,    vol_surface: VolSurface,
    compute_vega: bool = True,    compute_vega: bool = True,
) -> List:) -> List:
    results = []    results = []
    for trade in trades:    for trade in trades:
        pde_result = pde_price_exotic(local_vol, trade)        pde_result = pde_price_exotic(local_vol, trade)
        if compute_vega:        if compute_vega:
            barrier=float(spot * rng.uniform(0.65, 0.90)),            barrier=float(spot * rng.uniform(0.65, 0.90)),
            maturity=float(rng.choice(expiries)),            maturity=float(rng.choice(expiries)),
            rate=rate,            rate=rate,
            flag=rng.choice(["call", "put"]),            flag=rng.choice(["call", "put"]),
            notional=float(rng.choice([1e5, 5e5, 1e6])),            notional=float(rng.choice([1e5, 5e5, 1e6])),
            quantity=int(rng.choice([-1, 1])),            quantity=int(rng.choice([-1, 1])),
        )        )
        for i in range(n_trades)        for i in range(n_trades)
    ]    ]
    # Level 5-6: PDE pricing + vega ladder — N_trade parallel nodes    # Level 5: PDE pricing — N_trade parallel nodes (each independent given local_vol)
    final_results = price_and_compute_greeks(    with pf.set_parallel_backend_context(
        trades, local_vol, vol_surface, compute_vega        "scaler_remote",
        scheduler_address=scheduler_address,
        object_storage_address=object_storage_address,
        n_workers=192,
    )    ):
        final_results = price_and_compute_greeks(
            trades, local_vol, vol_surface, compute_vega
        )
    return aggregate_book_greeks(final_results, trades)    return aggregate_book_greeks(final_results, trades)
greeks = vol_surface_and_pde_pipeline(greeks = vol_surface_and_pde_pipeline(
    n_trades=150,    n_trades=150,
    expiries=[0.25, 0.5, 0.75, 1.0, 1.5, 2.0, 3.0, 5.0],    expiries=[0.25, 0.5, 0.75, 1.0, 1.5, 2.0, 3.0, 5.0],
    spot=100.0,    spot=100.0,
    rate=0.04,    rate=0.04,
    seed=42,    seed=42,

Statistics

Parfun

Pargraph

Sequential Runtime

Parallel Runtime

Workers

Speedup

Efficiency

Yes

No

81m46.430s

2m12.139s

128

37.13x

0.29