Capability Allocation ExampleΒΆ
Shows how to use capabilities for task routing.
"""Route tasks to workers by capability with submit_verbose."""
import math
import multiprocessing
from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo
from scaler.config.common.logging import LoggingConfig
from scaler.config.common.worker import WorkerConfig
from scaler.config.common.worker_manager import WorkerManagerConfig
from scaler.config.section.native_worker_manager import NativeWorkerManagerConfig, NativeWorkerManagerMode
from scaler.config.section.scheduler import PolicyConfig
from scaler.config.types.worker import WorkerCapabilities
from scaler.worker_manager_adapter.baremetal.native import NativeWorkerManager
def gpu_task(x: float) -> float:
return math.sqrt(x) * 2
def cpu_task(x: float) -> float:
return x * 2
def main():
cluster = SchedulerClusterCombo(
n_workers=2, scaler_policy=PolicyConfig(policy_content="allocate=capability; scaling=no")
)
base_manager = cluster._worker_manager
gpu_manager = NativeWorkerManager(
NativeWorkerManagerConfig(
worker_manager_config=WorkerManagerConfig(
scheduler_address=base_manager._address,
object_storage_address=base_manager._object_storage_address,
max_task_concurrency=1,
),
worker_manager_id="test_manager",
mode=NativeWorkerManagerMode.FIXED,
worker_config=WorkerConfig(
per_worker_capabilities=WorkerCapabilities({"gpu": -1}),
per_worker_task_queue_size=base_manager._task_queue_size,
heartbeat_interval_seconds=base_manager._heartbeat_interval_seconds,
task_timeout_seconds=base_manager._task_timeout_seconds,
death_timeout_seconds=base_manager._death_timeout_seconds,
garbage_collect_interval_seconds=base_manager._garbage_collect_interval_seconds,
trim_memory_threshold_bytes=base_manager._trim_memory_threshold_bytes,
hard_processor_suspend=base_manager._hard_processor_suspend,
io_threads=base_manager._io_threads,
event_loop=base_manager._event_loop,
),
logging_config=LoggingConfig(
paths=base_manager._logging_paths,
level=base_manager._logging_level,
config_file=base_manager._logging_config_file,
),
)
)
gpu_manager_process = multiprocessing.Process(target=gpu_manager.run)
gpu_manager_process.start()
with Client(address=cluster.get_address()) as client:
gpu_future = client.submit_verbose(gpu_task, args=(16.0,), kwargs={}, capabilities={"gpu": 1})
cpu_future = client.submit_verbose(cpu_task, args=(16.0,), kwargs={}, capabilities={})
gpu_future.result()
cpu_future.result()
if gpu_manager_process.is_alive():
gpu_manager_process.terminate()
gpu_manager_process.join()
cluster.shutdown()
if __name__ == "__main__":
main()
What the example does:
Starts a scheduler with capability-aware allocation policy.
Adds a worker manager that advertises
gpucapability.Submits one task with
capabilities={"gpu": 1}and another with no capability requirement.
Why this matters:
Capability constraints route tasks to compatible workers.
Non-constrained tasks can use general workers, improving cluster utilization.