Word-count map-reduce¶
Each worker regenerates its own slice of a synthetic corpus from a seed, tokenizes it, and returns a small Counter; the client merges them and prints the top words.
[ ]:
# Connection settings -- edit these to point at your running scheduler.
SCHEDULER_ADDRESS = "ws://127.0.0.1:2345" # supports tcp:// or ws://; only ws:// works from JupyterLite (browser)
OBJECT_STORAGE_ADDRESS = None # leave None to use whatever the scheduler advertises
# Defaults: 256 chunks * 16 MiB = 4 GiB of synthetic corpus tokenized worker-side.
# On 16 workers expect roughly a minute of wall-clock time.
N_CHUNKS = 256
CHUNK_SIZE_BYTES = 16 * 1024 * 1024
[ ]:
import time
from collections import Counter
from scaler import Client
VOCAB = (
"lorem ipsum dolor sit amet consectetur adipiscing elit sed do eiusmod"
" tempor incididunt ut labore et dolore magna aliqua scaler distributed"
" computing python parfun pargraph cluster scheduler worker task future"
" submit object storage map reduce parallel"
).split()
print(f"will tokenize {N_CHUNKS * CHUNK_SIZE_BYTES:,} bytes across {N_CHUNKS} chunks ({len(VOCAB)} unique vocab words)")
[ ]:
def count_words_in_chunk(chunk_index: int, chunk_bytes: int, vocab: list[str]) -> Counter:
"""Worker-side: regenerate this chunk's slice from a seed and tokenize it."""
import random
rng = random.Random(chunk_index)
pieces: list[str] = []
running = 0
while running < chunk_bytes:
word = rng.choice(vocab)
pieces.append(word)
running += len(word) + 1
return Counter(" ".join(pieces)[:chunk_bytes].split())
with Client(address=SCHEDULER_ADDRESS, object_storage_address=OBJECT_STORAGE_ADDRESS) as client:
vocab_ref = client.send_object(VOCAB, name="vocab")
started = time.perf_counter()
futures = [client.submit(count_words_in_chunk, i, CHUNK_SIZE_BYTES, vocab_ref) for i in range(N_CHUNKS)]
totals: Counter = Counter()
for future in futures:
totals.update(future.result())
elapsed = time.perf_counter() - started
print(f"map-reduced {sum(totals.values()):,} tokens across {N_CHUNKS} chunks in {elapsed:.2f}s")
for word, count in totals.most_common(10):
print(f" {word:<14} {count:>8,}")