Heavy object reuse with send_object¶
When many tasks share one large payload, submit re-sends it every time. Upload it once with Client.send_object and pass the returned reference into each task instead.
[ ]:
SCHEDULER_ADDRESS = "ws://127.0.0.1:2345" # edit me -- supports tcp:// or ws://; only ws:// works in JupyterLite (browser)
OBJECT_STORAGE_ADDRESS = None # edit me -- leave None to use whatever the scheduler advertises
Anti-pattern: pass the payload into every submit¶
Every call captures the same 1 MB of bytes inside functools.partial, so the scheduler ends up shipping the same payload 64 times.
[ ]:
import functools
import random
import time
from scaler import Client
PAYLOAD = b"x" * (1 << 20) # 1 MB
INDICES = [random.randint(0, 100) for _ in range(64)]
def lookup(payload: bytes, index: int) -> int:
return len(payload) + index
with Client(address=SCHEDULER_ADDRESS, object_storage_address=OBJECT_STORAGE_ADDRESS) as client:
big_lookup = functools.partial(lookup, PAYLOAD)
started = time.perf_counter()
futures = [client.submit(big_lookup, i) for i in INDICES]
results_a = [f.result() for f in futures]
elapsed_a = time.perf_counter() - started
print(f"submitted {len(INDICES)} tasks with payload-per-task in {elapsed_a:.3f}s")
print("first 5 results:", results_a[:5])
Better: upload once with send_object, pass the reference¶
send_object returns an ObjectReference that you pass as a positional task argument. The payload is transferred once; each task only carries a small reference plus its own lightweight args.
[ ]:
with Client(address=SCHEDULER_ADDRESS, object_storage_address=OBJECT_STORAGE_ADDRESS) as client:
payload_ref = client.send_object(PAYLOAD, name="payload")
started = time.perf_counter()
futures = [client.submit(lookup, payload_ref, i) for i in INDICES]
results_b = [f.result() for f in futures]
elapsed_b = time.perf_counter() - started
print(f"submitted {len(INDICES)} tasks with shared object in {elapsed_b:.3f}s")
print("results match:", results_a == results_b)
print(f"speedup: {elapsed_a / elapsed_b:.2f}x")
Notes¶
Pass the ObjectReference as a positional task argument; do not nest it inside lists, dicts, or other containers.