Heavy object reuse with send_object

When many tasks share one large payload, submit re-sends it every time. Upload it once with Client.send_object and pass the returned reference into each task instead.

[ ]:
SCHEDULER_ADDRESS = "ws://127.0.0.1:2345"  # edit me -- supports tcp:// or ws://; only ws:// works in JupyterLite (browser)
OBJECT_STORAGE_ADDRESS = None  # edit me -- leave None to use whatever the scheduler advertises

Anti-pattern: pass the payload into every submit

Every call captures the same 1 MB of bytes inside functools.partial, so the scheduler ends up shipping the same payload 64 times.

[ ]:
import functools
import random
import time

from scaler import Client

PAYLOAD = b"x" * (1 << 20)  # 1 MB
INDICES = [random.randint(0, 100) for _ in range(64)]


def lookup(payload: bytes, index: int) -> int:
    return len(payload) + index


with Client(address=SCHEDULER_ADDRESS, object_storage_address=OBJECT_STORAGE_ADDRESS) as client:
    big_lookup = functools.partial(lookup, PAYLOAD)
    started = time.perf_counter()
    futures = [client.submit(big_lookup, i) for i in INDICES]
    results_a = [f.result() for f in futures]
    elapsed_a = time.perf_counter() - started

print(f"submitted {len(INDICES)} tasks with payload-per-task in {elapsed_a:.3f}s")
print("first 5 results:", results_a[:5])

Better: upload once with send_object, pass the reference

send_object returns an ObjectReference that you pass as a positional task argument. The payload is transferred once; each task only carries a small reference plus its own lightweight args.

[ ]:
with Client(address=SCHEDULER_ADDRESS, object_storage_address=OBJECT_STORAGE_ADDRESS) as client:
    payload_ref = client.send_object(PAYLOAD, name="payload")
    started = time.perf_counter()
    futures = [client.submit(lookup, payload_ref, i) for i in INDICES]
    results_b = [f.result() for f in futures]
    elapsed_b = time.perf_counter() - started

print(f"submitted {len(INDICES)} tasks with shared object in {elapsed_b:.3f}s")
print("results match:", results_a == results_b)
print(f"speedup: {elapsed_a / elapsed_b:.2f}x")

Notes

Pass the ObjectReference as a positional task argument; do not nest it inside lists, dicts, or other containers.