Client Submit Tasks¶
Use submit() for one task (function + arguments).
Call result() to retrieve the value.
"""Basic Client.submit example with a local cluster."""
import math
from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo
def main():
cluster = SchedulerClusterCombo(n_workers=10)
with Client(address=cluster.get_address()) as client:
futures = [client.submit(math.sqrt, value) for value in range(100)]
total = sum(future.result() for future in futures)
print(total)
cluster.shutdown()
if __name__ == "__main__":
main()
What the example does:
Starts a local scheduler + workers with
SchedulerClusterCombo.Connects a client to that scheduler address.
Calls
submit()once per task input.Resolves each returned future with
result()and aggregates the values.
Use submit when you need one-off calls or per-task argument differences.
Reusing the same object across tasks¶
When the same Python object is passed to several tasks, Scaler serializes and uploads it only once
and reuses that upload for every task that references it – whether the tasks come from a single
map() / get() call or from many separate
submit() calls. This happens automatically; no code change is required.
Two layers cooperate here. A client-side cache keyed by object identity (id(obj)) avoids
re-serializing the same object, and object IDs are content-addressed (derived from the serialized
bytes), so the upload itself is skipped whenever the server already holds identical content.
There is one caveat: if you mutate an object in place and submit it again, the identity cache
returns the pre-mutation snapshot and the task receives the stale bytes. Pass reserialize=True to
re-serialize that call’s arguments and refresh the cache:
data = load_dataframe()
client.submit_verbose(train, (data,), {}) # serialized, uploaded, cached
data.drop(columns=["unused"], inplace=True) # mutated in place
client.submit_verbose(train, (data,), {}, reserialize=True) # re-serialized and re-uploaded
Because IDs are content-addressed, reserialize re-serializes the call’s arguments but only
re-uploads the ones whose content actually changed – passing it for an object that turned out not to
have changed costs a re-serialization, not a re-upload.
reserialize is available on submit_verbose(), map(),
starmap() and get(). submit() forwards its
keyword arguments to your function, so use submit_verbose() when you need the
flag. It affects only the objects in that one call; every other cached object is untouched.
Sending a heavy object explicitly¶
Because reuse is deduplicated automatically (above), you rarely need to send objects by hand.
send_object() still helps in one case: it serializes a large payload once and
returns a lightweight reference, avoiding the per-call re-serialization that the automatic cache
cannot skip for non-weakref-able built-ins (bytes, str, list, dict, tuple) reused
across many separate submit() calls. It is also an explicit handle you can pass
wherever a positional argument is expected.
import random
from scaler import Client, SchedulerClusterCombo
def lookup(heavy_map_ref, index: int):
return heavy_map_ref[index]
def main():
address = "tcp://127.0.0.1:2345"
cluster = SchedulerClusterCombo(address=address, n_workers=3)
heavy_map = b"1" * 10_000_000
arguments = [random.randint(0, 100) for _ in range(100)]
with Client(address=address) as client:
heavy_map_ref = client.send_object(heavy_map, name="heavy_map")
futures = [client.submit(lookup, heavy_map_ref, i) for i in arguments]
print([future.result() for future in futures])
cluster.shutdown()
if __name__ == "__main__":
main()
Notes for send_object():
The payload is serialized and uploaded once; each task then carries only a small reference.
Unlike passing the object directly, the reference is not re-serialized per task.
The returned reference must be passed as a positional function argument.
Do not nest object references inside other containers (for example lists or dicts).
Task profiling¶
To measure per-task runtime and memory, enable profiling when submitting the task. Task profiling values are available after the task completes.
from scaler import Client
def calculate(sec: int):
return sec * 1
client = Client(address="tcp://127.0.0.1:2345")
fut = client.submit(calculate, 1, profiling=True)
# Ensure task execution is complete
fut.result()
# Runtime in microseconds
fut.profiling_info().duration_us
# Peak task memory usage in bytes (sampled periodically)
fut.profiling_info().peak_memory