Calculate Implied VolatilityΒΆ

This example calculates implied volatility for many market prices using client.map(). The input is chunked because client.map() is most useful when task bodies are large or computationally heavy.

"""
This program computes the implied Black-Scholes volatility given market price and model price.

This program is revised based on
https://stackoverflow.com/questions/61289020/fast-implied-volatility-calculation-in-python

Usage:
    $ git clone https://github.com/finos/opengris-scaler && cd opengris-scaler
    $ pip install -r examples/applications/requirements_applications.txt
    $ python -m examples.applications.implied_volatility
"""

from timeit import default_timer
from typing import List

import numpy as np
import psutil
from scipy.stats import norm

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo


def bs_call(S, K, T, r, vol):
    d1 = (np.log(S / K) + (r + 0.5 * vol**2) * T) / (vol * np.sqrt(T))
    d2 = d1 - vol * np.sqrt(T)
    return S * norm.cdf(d1) - np.exp(-r * T) * K * norm.cdf(d2)


def bs_vega(S, K, T, r, sigma):
    d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    return S * norm.pdf(d1) * np.sqrt(T)


def find_volatility(target_value, S, K, T, r) -> float:
    MAX_ITERATIONS = 200
    PRECISION = 1.0e-5
    sigma = 0.5
    for _ in range(0, MAX_ITERATIONS):
        price = bs_call(S, K, T, r, sigma)
        vega = bs_vega(S, K, T, r, sigma)
        diff = target_value - price  # our root
        if abs(diff) < PRECISION:
            return sigma
        sigma = sigma + diff / vega  # f(x) / f'(x)
    return sigma  # value wasn't found, return best guess so far


def find_volatilities(dataset: np.ndarray) -> List[float]:
    return [find_volatility(*sample) for sample in dataset]


def generate_synthetic_data(n_samples: int) -> np.ndarray:
    S = np.random.randint(100, 200, n_samples)  # stock prices
    K = S * 1.25  # strike prices
    T = np.ones(n_samples)  # time maturity (year)
    r = np.random.randint(0, 3, n_samples) / 100  # risk-free-rate
    vol = np.random.randint(15, 50, n_samples) / 100  # volatility
    prices = bs_call(S, K, T, r, vol)

    return np.column_stack((prices, S, K, T, r))


def main():
    N_SAMPLES = 20_000
    N_SAMPLES_PER_TASK = 1_000

    n_workers = psutil.cpu_count()
    if n_workers is None:
        n_workers = 4

    cluster = SchedulerClusterCombo(n_workers=n_workers)

    dataset = generate_synthetic_data(N_SAMPLES)

    # Split the dataset in chunks of up to `N_SAMPLES_PER_TASK`.
    per_task_dataset = [
        dataset[data_begin : data_begin + N_SAMPLES_PER_TASK] for data_begin in range(0, N_SAMPLES, N_SAMPLES_PER_TASK)
    ]

    # Process the dataset in parallel, concatenate the results
    with Client(address=cluster.get_address()) as client:
        start = default_timer()
        results = np.concatenate(client.map(find_volatilities, per_task_dataset))
        duration = default_timer() - start

    cluster.shutdown()

    print(f"Computed {len(results)} stock volatilities in {duration:.2f}s")


if __name__ == "__main__":
    main()