Word-count map-reduce

Each worker regenerates its own slice of a synthetic corpus from a seed, tokenizes it, and returns a small Counter; the client merges them and prints the top words.

[ ]:
# Connection settings -- edit these to point at your running scheduler.
SCHEDULER_ADDRESS = "ws://127.0.0.1:2345"  # supports tcp:// or ws://; only ws:// works from JupyterLite (browser)
OBJECT_STORAGE_ADDRESS = None  # leave None to use whatever the scheduler advertises

# Defaults: 256 chunks * 16 MiB = 4 GiB of synthetic corpus tokenized worker-side.
# On 16 workers expect roughly a minute of wall-clock time.
N_CHUNKS = 256
CHUNK_SIZE_BYTES = 16 * 1024 * 1024
[ ]:
import time

from collections import Counter

from scaler import Client

VOCAB = (
    "lorem ipsum dolor sit amet consectetur adipiscing elit sed do eiusmod"
    " tempor incididunt ut labore et dolore magna aliqua scaler distributed"
    " computing python parfun pargraph cluster scheduler worker task future"
    " submit object storage map reduce parallel"
).split()

print(f"will tokenize {N_CHUNKS * CHUNK_SIZE_BYTES:,} bytes across {N_CHUNKS} chunks ({len(VOCAB)} unique vocab words)")
[ ]:
def count_words_in_chunk(chunk_index: int, chunk_bytes: int, vocab: list[str]) -> Counter:
    """Worker-side: regenerate this chunk's slice from a seed and tokenize it."""
    import random

    rng = random.Random(chunk_index)
    pieces: list[str] = []
    running = 0
    while running < chunk_bytes:
        word = rng.choice(vocab)
        pieces.append(word)
        running += len(word) + 1
    return Counter(" ".join(pieces)[:chunk_bytes].split())


with Client(address=SCHEDULER_ADDRESS, object_storage_address=OBJECT_STORAGE_ADDRESS) as client:
    vocab_ref = client.send_object(VOCAB, name="vocab")
    started = time.perf_counter()
    futures = [client.submit(count_words_in_chunk, i, CHUNK_SIZE_BYTES, vocab_ref) for i in range(N_CHUNKS)]
    totals: Counter = Counter()
    for future in futures:
        totals.update(future.result())
    elapsed = time.perf_counter() - started

print(f"map-reduced {sum(totals.values()):,} tokens across {N_CHUNKS} chunks in {elapsed:.2f}s")
for word, count in totals.most_common(10):
    print(f"  {word:<14} {count:>8,}")