Monte Carlo estimation of pi

Each worker throws random darts at the unit square; the client sums the per-worker hit counts and recovers pi.

[ ]:
# Connection settings -- edit these to point at your running scheduler.
SCHEDULER_ADDRESS = "ws://127.0.0.1:2345"  # supports tcp:// or ws://; only ws:// works from JupyterLite (browser)
OBJECT_STORAGE_ADDRESS = None  # leave None to use whatever the scheduler advertises

# Defaults: 128 tasks * 200 * 2_000_000 = 5.12e10 darts; on 16 workers expect roughly a minute.
N_TASKS = 128
CHUNK_SIZE = 2_000_000        # samples per inner numpy batch (kept small so memory stays bounded)
CHUNKS_PER_TASK = 200         # each task processes CHUNK_SIZE * CHUNKS_PER_TASK darts
[ ]:
import time

from scaler import Client


def count_inside_unit_circle(seed: int, chunks: int, chunk_size: int) -> int:
    """Worker-side: throw `chunks * chunk_size` random darts in batches; return the hit count."""
    import numpy as np

    rng = np.random.default_rng(seed)
    inside = 0
    for _ in range(chunks):
        x = rng.random(chunk_size, dtype=np.float64)
        y = rng.random(chunk_size, dtype=np.float64)
        inside += int(np.count_nonzero(x * x + y * y <= 1.0))
    return inside


with Client(address=SCHEDULER_ADDRESS, object_storage_address=OBJECT_STORAGE_ADDRESS) as client:
    started = time.perf_counter()
    futures = [client.submit(count_inside_unit_circle, seed, CHUNKS_PER_TASK, CHUNK_SIZE) for seed in range(N_TASKS)]
    inside = sum(future.result() for future in futures)
    elapsed = time.perf_counter() - started

total = N_TASKS * CHUNKS_PER_TASK * CHUNK_SIZE
pi_estimate = 4.0 * inside / total
print(f"threw {total:,} darts across {N_TASKS} tasks in {elapsed:.2f}s")
print(f"pi ~= {pi_estimate:.6f}  (error {abs(pi_estimate - 3.141592653589793):.2e})")