Simple Engine

simple requires a semicolon-delimited policy_content string with exactly two keys:

  • allocate: even_load or capability

  • scaling: no, vanilla, or capability

Quick Start (copy/paste)

The example below starts a scheduler and one native worker manager using vanilla scaling.

[scheduler]
scheduler_address = "tcp://127.0.0.1:8516"
# for following object_storage_address
# - if omitted, object storage is auto-started at scheduler port + 1
# - if specified, scheduler will connect to specified address without start one
# object_storage_address = "tcp://127.0.0.1:8517"
policy_engine_type = "simple"
policy_content = "allocate=even_load; scaling=vanilla"

[[worker_manager]]
type = "baremetal_native"
scheduler_address = "tcp://127.0.0.1:8516"
object_storage_address = "tcp://127.0.0.1:8517"
worker_manager_id = "NAT|default"
max_task_concurrency = 8

Run command:

$ scaler config.toml

Other quick policy strings for simple:

# No autoscaling
policy_engine_type = "simple"
policy_content = "allocate=even_load; scaling=no"

# Capability-aware autoscaling (recommended pair)
policy_engine_type = "simple"
policy_content = "allocate=capability; scaling=capability"

Allocation

The allocate option controls how tasks are assigned to available workers.

  • allocate=even_load

    • Spreads tasks across workers evenly.

    • Best for homogeneous workers where any worker can run any task.

    • Commonly paired with scaling=vanilla.

  • allocate=capability

    • Routes tasks to workers whose capabilities match task requirements.

    • Best for heterogeneous clusters (for example CPU-only + GPU workers).

    • Should be paired with scaling=capability so scale-up requests are also capability-aware.

Capability routing example:

"""Route tasks to workers by capability with submit_verbose."""

import math
import multiprocessing

from scaler import Client
from scaler.cluster.combo import SchedulerClusterCombo
from scaler.config.common.logging import LoggingConfig
from scaler.config.common.worker import WorkerConfig
from scaler.config.common.worker_manager import WorkerManagerConfig
from scaler.config.section.native_worker_manager import NativeWorkerManagerConfig, NativeWorkerManagerMode
from scaler.config.section.scheduler import PolicyConfig
from scaler.config.types.worker import WorkerCapabilities
from scaler.worker_manager_adapter.baremetal.native import NativeWorkerManager


def gpu_task(x: float) -> float:
    return math.sqrt(x) * 2


def cpu_task(x: float) -> float:
    return x * 2


def main():
    cluster = SchedulerClusterCombo(
        n_workers=2, scaler_policy=PolicyConfig(policy_content="allocate=capability; scaling=no")
    )

    base_manager = cluster._worker_manager
    gpu_manager = NativeWorkerManager(
        NativeWorkerManagerConfig(
            worker_manager_config=WorkerManagerConfig(
                scheduler_address=base_manager._address,
                object_storage_address=base_manager._object_storage_address,
                max_task_concurrency=1,
            ),
            worker_manager_id="test_manager",
            mode=NativeWorkerManagerMode.FIXED,
            worker_config=WorkerConfig(
                per_worker_capabilities=WorkerCapabilities({"gpu": -1}),
                per_worker_task_queue_size=base_manager._task_queue_size,
                heartbeat_interval_seconds=base_manager._heartbeat_interval_seconds,
                task_timeout_seconds=base_manager._task_timeout_seconds,
                death_timeout_seconds=base_manager._death_timeout_seconds,
                garbage_collect_interval_seconds=base_manager._garbage_collect_interval_seconds,
                trim_memory_threshold_bytes=base_manager._trim_memory_threshold_bytes,
                hard_processor_suspend=base_manager._hard_processor_suspend,
                io_threads=base_manager._io_threads,
                event_loop=base_manager._event_loop,
            ),
            logging_config=LoggingConfig(
                paths=base_manager._logging_paths,
                level=base_manager._logging_level,
                config_file=base_manager._logging_config_file,
            ),
        )
    )
    gpu_manager_process = multiprocessing.Process(target=gpu_manager.run)
    gpu_manager_process.start()

    with Client(address=cluster.get_address()) as client:
        gpu_future = client.submit_verbose(gpu_task, args=(16.0,), kwargs={}, capabilities={"gpu": 1})
        cpu_future = client.submit_verbose(cpu_task, args=(16.0,), kwargs={}, capabilities={})
        gpu_future.result()
        cpu_future.result()

    if gpu_manager_process.is_alive():
        gpu_manager_process.terminate()
    gpu_manager_process.join()
    cluster.shutdown()


if __name__ == "__main__":
    main()

Scaling

The scaling option controls how worker capacity grows or shrinks.

  • scaling=no

    • Disables scheduler-driven scaling commands.

    • Use for static capacity or external orchestrators.

  • scaling=vanilla

    • General autoscaling for homogeneous clusters.

    • Scale up when tasks / workers > 10.

    • Scale down when tasks / workers < 1.

  • scaling=capability

    • Capability-aware autoscaling for heterogeneous clusters.

    • Groups demand by capability and scales per capability group.

    • Scale up when tasks / capable_workers > 5.

    • Scale down when tasks / capable_workers < 0.5.

Notes:

  • policy_content must contain exactly allocate and scaling keys.

  • Scale-up is capped by each manager heartbeat’s max_task_concurrency.

  • Threshold values are currently fixed in code.