Monte Carlo estimation of pi¶
Each worker throws random darts at the unit square; the client sums the per-worker hit counts and recovers pi.
[ ]:
# Connection settings -- edit these to point at your running scheduler.
SCHEDULER_ADDRESS = "ws://127.0.0.1:2345" # supports tcp:// or ws://; only ws:// works from JupyterLite (browser)
OBJECT_STORAGE_ADDRESS = None # leave None to use whatever the scheduler advertises
# Defaults: 128 tasks * 200 * 2_000_000 = 5.12e10 darts; on 16 workers expect roughly a minute.
N_TASKS = 128
CHUNK_SIZE = 2_000_000 # samples per inner numpy batch (kept small so memory stays bounded)
CHUNKS_PER_TASK = 200 # each task processes CHUNK_SIZE * CHUNKS_PER_TASK darts
[ ]:
import time
from scaler import Client
def count_inside_unit_circle(seed: int, chunks: int, chunk_size: int) -> int:
"""Worker-side: throw `chunks * chunk_size` random darts in batches; return the hit count."""
import numpy as np
rng = np.random.default_rng(seed)
inside = 0
for _ in range(chunks):
x = rng.random(chunk_size, dtype=np.float64)
y = rng.random(chunk_size, dtype=np.float64)
inside += int(np.count_nonzero(x * x + y * y <= 1.0))
return inside
with Client(address=SCHEDULER_ADDRESS, object_storage_address=OBJECT_STORAGE_ADDRESS) as client:
started = time.perf_counter()
futures = [client.submit(count_inside_unit_circle, seed, CHUNKS_PER_TASK, CHUNK_SIZE) for seed in range(N_TASKS)]
inside = sum(future.result() for future in futures)
elapsed = time.perf_counter() - started
total = N_TASKS * CHUNKS_PER_TASK * CHUNK_SIZE
pi_estimate = 4.0 * inside / total
print(f"threw {total:,} darts across {N_TASKS} tasks in {elapsed:.2f}s")
print(f"pi ~= {pi_estimate:.6f} (error {abs(pi_estimate - 3.141592653589793):.2e})")