Scaling Policies
Scaler provides an experimental auto-scaling feature that allows the system to dynamically adjust the number of workers based on workload. Scaling policies determine when to add or remove workers, while Worker Adapters handle the actual provisioning of resources.
Overview
The scaling system consists of two main components:
Scaling Controller: A policy that monitors task queues and worker availability to make scaling decisions.
Worker Adapter: A component that handles the actual creation and destruction of worker groups (e.g., starting containers, launching processes).
The Scaling Controller runs within the Scheduler and communicates with Worker Adapters via Cap’n Proto messages. Worker Adapters connect to the Scheduler and receive scaling commands directly.
The scaling policy is configured via the policy_content setting in the scheduler configuration:
scaler_scheduler tcp://127.0.0.1:8516 --policy-content "allocate=capability; scaling=vanilla"
Or in a TOML configuration file:
[scheduler]
policy_content = "allocate=capability; scaling=capability"
Available Scaling Policies
Scaler provides several built-in scaling policies:
Policy |
Description |
|---|---|
|
No automatic scaling. Workers are managed manually or statically provisioned. |
|
Basic task-to-worker ratio scaling. Scales up when task ratio exceeds threshold, scales down when idle. |
|
Capability-aware scaling. Scales worker groups based on task-required capabilities (e.g., GPU, memory). |
|
Hybrid scaling using primary and secondary worker adapters with configurable limits. |
No Scaling (no)
The simplest policy that performs no automatic scaling. Use this when:
Workers are statically provisioned
External orchestration handles scaling (e.g., Kubernetes HPA)
You want full manual control over worker count
scaler_scheduler tcp://127.0.0.1:8516 --policy-content "allocate=even_load; scaling=no"
Vanilla Scaling (vanilla)
The vanilla scaling controller uses a simple task-to-worker ratio to make scaling decisions:
Scale up: When
tasks / workers > upper_task_ratio(default: 10)Scale down: When
tasks / workers < lower_task_ratio(default: 1)
This policy is straightforward and works well for homogeneous workloads where all workers can handle all tasks.
scaler_scheduler tcp://127.0.0.1:8516 \
--policy-content "allocate=even_load; scaling=vanilla"
Capability Scaling (capability)
The capability scaling controller is designed for heterogeneous workloads where tasks require specific capabilities (e.g., GPU, high memory, specialized hardware).
Key Features:
Groups tasks by their required capability sets
Groups workers by their provided capability sets
Scales worker groups per capability set independently
Ensures tasks are matched to workers that can handle them
Prevents scaling down the last worker group capable of handling pending tasks
Prevents thrashing by checking if scale-down would immediately trigger scale-up
How It Works:
Task Grouping: Tasks are grouped by their required capability keys (e.g.,
{"gpu"},{"gpu", "high_memory"}).Worker Matching: Workers are grouped by their provided capabilities. A worker can handle a task if the task’s required capabilities are a subset of the worker’s capabilities.
Per-Capability Scaling: The controller applies the task-to-worker ratio logic independently for each capability set:
Scale up: When
tasks / capable_workers > upper_task_ratio(default: 5)Scale down: When
tasks / capable_workers < lower_task_ratio(default: 0.5)
Capability Request: When scaling up, the controller requests worker groups with specific capabilities from the worker adapter.
Configuration:
scaler_scheduler tcp://127.0.0.1:8516 \
--policy-content "allocate=capability; scaling=capability"
Example Scenario:
Consider a workload with both CPU-only and GPU tasks:
from scaler import Client
with Client(address="tcp://127.0.0.1:8516") as client:
# Submit CPU tasks (no special capabilities required)
cpu_futures = [
client.submit_verbose(cpu_task, args=(i,), capabilities={})
for i in range(100)
]
# Submit GPU tasks (require GPU capability)
gpu_futures = [
client.submit_verbose(gpu_task, args=(i,), capabilities={"gpu": 1})
for i in range(50)
]
With the capability scaling policy:
If no GPU workers exist, the controller requests a worker group with
{"gpu": 1}from the adapter.CPU and GPU worker groups are scaled independently based on their respective task queues.
Idle GPU workers can be shut down without affecting CPU task processing.
Fixed Elastic Scaling (fixed_elastic)
The fixed elastic scaling controller supports hybrid scaling with multiple worker adapters:
Primary Adapter: A single worker group (identified by
max_worker_groups == 1) that starts once and never shuts downSecondary Adapter: Elastic capacity (
max_worker_groups > 1) that scales based on demand
This is useful for scenarios where you have a fixed pool of dedicated resources but want to burst to additional resources during peak demand.
scaler_scheduler tcp://127.0.0.1:8516 \
--policy-content "allocate=even_load; scaling=fixed_elastic"
Behavior:
The primary adapter’s worker group is started once and never shut down
Secondary adapter groups are created when demand exceeds primary capacity
When scaling down, only secondary adapter groups are shut down
Worker Adapter Protocol
Scaling controllers, running within the scheduler process, communicate with worker adapters using Cap’n Proto messages through the connection that worker adapters use to communicate with the scheduler. The protocol uses the following message types:
WorkerAdapterHeartbeat (Adapter -> Scheduler):
Worker adapters periodically send heartbeats to the scheduler containing their capacity information:
max_worker_groups: Maximum number of worker groups this adapter can manageworkers_per_group: Number of workers in each groupcapabilities: Default capabilities for workers from this adapter
WorkerAdapterCommand (Scheduler -> Adapter):
The scheduler sends commands to worker adapters:
StartWorkerGroup: Request to start a new worker groupworker_group_id: Empty for new groups (adapter assigns ID)capabilities: Required capabilities for the worker group
ShutdownWorkerGroup: Request to shut down an existing worker groupworker_group_id: ID of the group to shut down
WorkerAdapterCommandResponse (Adapter -> Scheduler):
Worker adapters respond to commands with status and details:
worker_group_id: ID of the affected worker groupcommand: The command type this response is forstatus: Result status (Success,WorkerGroupTooMuch,WorkerGroupIDNotFound)worker_ids: List of worker IDs in the group (for start commands)capabilities: Actual capabilities of the started workers
Example Worker Adapter
Here is an example of a worker adapter using the ECS (Amazon Elastic Container Service) integration:
import asyncio
import logging
import signal
import uuid
from dataclasses import dataclass
from typing import Dict, List, Set, Tuple
import boto3
import zmq
from scaler.config.section.ecs_worker_adapter import ECSWorkerAdapterConfig
from scaler.io.utility import create_async_connector, create_async_simple_context
from scaler.io.ymq import ymq
from scaler.protocol.python.message import (
Message,
WorkerAdapterCommand,
WorkerAdapterCommandResponse,
WorkerAdapterCommandType,
WorkerAdapterHeartbeat,
WorkerAdapterHeartbeatEcho,
)
from scaler.utility.event_loop import create_async_loop_routine, register_event_loop
from scaler.utility.identifiers import WorkerID
from scaler.utility.logging.utility import setup_logger
from scaler.worker_adapter.common import WorkerGroupID, format_capabilities
Status = WorkerAdapterCommandResponse.Status
@dataclass
class WorkerGroupInfo:
worker_ids: Set[WorkerID]
task_arn: str
class ECSWorkerAdapter:
def __init__(self, config: ECSWorkerAdapterConfig):
self._address = config.worker_adapter_config.scheduler_address
self._object_storage_address = config.worker_adapter_config.object_storage_address
self._capabilities = config.worker_config.per_worker_capabilities.capabilities
self._io_threads = config.worker_io_threads
self._per_worker_task_queue_size = config.worker_config.per_worker_task_queue_size
self._max_instances = config.worker_adapter_config.max_workers
self._heartbeat_interval_seconds = config.worker_config.heartbeat_interval_seconds
self._task_timeout_seconds = config.worker_config.task_timeout_seconds
self._death_timeout_seconds = config.worker_config.death_timeout_seconds
self._garbage_collect_interval_seconds = config.worker_config.garbage_collect_interval_seconds
self._trim_memory_threshold_bytes = config.worker_config.trim_memory_threshold_bytes
self._hard_processor_suspend = config.worker_config.hard_processor_suspend
self._event_loop = config.event_loop
self._logging_paths = config.logging_config.paths
self._logging_level = config.logging_config.level
self._logging_config_file = config.logging_config.config_file
self._aws_access_key_id = config.aws_access_key_id
self._aws_secret_access_key = config.aws_secret_access_key
self._aws_region = config.aws_region
self._ecs_cluster = config.ecs_cluster
self._ecs_task_image = config.ecs_task_image
self._ecs_python_requirements = config.ecs_python_requirements
self._ecs_python_version = config.ecs_python_version
self._ecs_task_definition = config.ecs_task_definition
self._ecs_task_cpu = config.ecs_task_cpu
self._ecs_task_memory = config.ecs_task_memory
self._ecs_subnets = config.ecs_subnets
aws_session = boto3.Session(
aws_access_key_id=self._aws_access_key_id,
aws_secret_access_key=self._aws_secret_access_key,
region_name=self._aws_region,
)
self._ecs_client = aws_session.client("ecs")
resp = self._ecs_client.describe_clusters(clusters=[self._ecs_cluster])
clusters = resp.get("clusters") or []
if not clusters or clusters[0]["status"] != "ACTIVE":
logging.info(f"ECS cluster '{self._ecs_cluster}' missing, creating it.")
self._ecs_client.create_cluster(clusterName=self._ecs_cluster)
self._worker_groups: Dict[WorkerGroupID, WorkerGroupInfo] = {}
try:
resp = self._ecs_client.describe_task_definition(taskDefinition=self._ecs_task_definition)
except self._ecs_client.exceptions.ClientException:
logging.info(f"ECS task definition '{self._ecs_task_definition}' missing, creating it.")
iam_client = aws_session.client("iam")
try:
resp = iam_client.get_role(RoleName="ecsTaskExecutionRole")
execution_role_arn = resp["Role"]["Arn"]
except iam_client.exceptions.NoSuchEntityException:
resp = iam_client.create_role(
RoleName="ecsTaskExecutionRole",
AssumeRolePolicyDocument=(
'{"Version": "2012-10-17", '
'"Statement": [{"Effect": "Allow", '
'"Principal": {"Service": "ecs-tasks.amazonaws.com"}, "Action": "sts:AssumeRole"}]}'
),
)
execution_role_arn = resp["Role"]["Arn"]
iam_client.attach_role_policy(
RoleName="ecsTaskExecutionRole",
PolicyArn="arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy",
)
resp = self._ecs_client.register_task_definition(
family=self._ecs_task_definition,
cpu=str(self._ecs_task_cpu * 1024),
memory=str(self._ecs_task_memory * 1024),
runtimePlatform={"cpuArchitecture": "X86_64", "operatingSystemFamily": "LINUX"},
networkMode="awsvpc",
containerDefinitions=[{"name": "scaler-container", "image": self._ecs_task_image, "essential": True}],
requiresCompatibilities=["FARGATE"],
executionRoleArn=execution_role_arn,
)
self._ecs_task_definition = resp["taskDefinition"]["taskDefinitionArn"]
self._context = create_async_simple_context()
self._name = "worker_adapter_ecs"
self._ident = f"{self._name}|{uuid.uuid4().bytes.hex()}".encode()
self._connector_external = create_async_connector(
self._context,
name=self._name,
socket_type=zmq.DEALER,
address=self._address,
bind_or_connect="connect",
callback=self.__on_receive_external,
identity=self._ident,
)
async def __on_receive_external(self, message: Message):
if isinstance(message, WorkerAdapterCommand):
await self._handle_command(message)
elif isinstance(message, WorkerAdapterHeartbeatEcho):
pass
else:
logging.warning(f"Received unknown message type: {type(message)}")
async def _handle_command(self, command: WorkerAdapterCommand):
cmd_type = command.command
worker_group_id = command.worker_group_id
response_status = Status.Success
worker_ids: List[bytes] = []
capabilities: Dict[str, int] = {}
cmd_res = WorkerAdapterCommandType.StartWorkerGroup
if cmd_type == WorkerAdapterCommandType.StartWorkerGroup:
cmd_res = WorkerAdapterCommandType.StartWorkerGroup
worker_group_id, response_status = await self.start_worker_group()
if response_status == Status.Success:
worker_ids = [bytes(wid) for wid in self._worker_groups[worker_group_id].worker_ids]
capabilities = self._capabilities
elif cmd_type == WorkerAdapterCommandType.ShutdownWorkerGroup:
cmd_res = WorkerAdapterCommandType.ShutdownWorkerGroup
response_status = await self.shutdown_worker_group(worker_group_id)
else:
raise ValueError("Unknown Command")
await self._connector_external.send(
WorkerAdapterCommandResponse.new_msg(
worker_group_id=worker_group_id,
command=cmd_res,
status=response_status,
worker_ids=worker_ids,
capabilities=capabilities,
)
)
async def __send_heartbeat(self):
await self._connector_external.send(
WorkerAdapterHeartbeat.new_msg(
max_worker_groups=self._max_instances,
workers_per_group=self._ecs_task_cpu,
capabilities=self._capabilities,
)
)
def run(self) -> None:
self.__register_signal()
self._loop = asyncio.new_event_loop()
self._loop.run_until_complete(self._run())
def __destroy(self, *args):
print(f"Worker adapter {self._ident!r} received signal, shutting down")
self._task.cancel()
def __register_signal(self):
signal.signal(signal.SIGINT, self.__destroy)
signal.signal(signal.SIGTERM, self.__destroy)
async def _run(self) -> None:
register_event_loop(self._event_loop)
setup_logger(self._logging_paths, self._logging_config_file, self._logging_level)
self._task = self._loop.create_task(self.__get_loops())
await self._task
async def __get_loops(self):
loops = [
create_async_loop_routine(self._connector_external.routine, 0),
create_async_loop_routine(self.__send_heartbeat, self._heartbeat_interval_seconds),
]
try:
await asyncio.gather(*loops)
except asyncio.CancelledError:
pass
except ymq.YMQException as e:
if e.code == ymq.ErrorCode.ConnectorSocketClosedByRemoteEnd:
pass
else:
logging.exception(f"{self._ident!r}: failed with unhandled exception:\n{e}")
async def start_worker_group(self) -> Tuple[WorkerGroupID, Status]:
if len(self._worker_groups) >= self._max_instances != -1:
return b"", Status.WorkerGroupTooMuch
worker_names = [f"ECS|{uuid.uuid4().hex}" for _ in range(self._ecs_task_cpu)]
command = (
f"scaler_cluster {self._address.to_address()} "
f"--num-of-workers {self._ecs_task_cpu} "
f"--worker-names \"{','.join(worker_names)}\" "
f"--per-worker-task-queue-size {self._per_worker_task_queue_size} "
f"--heartbeat-interval-seconds {self._heartbeat_interval_seconds} "
f"--task-timeout-seconds {self._task_timeout_seconds} "
f"--garbage-collect-interval-seconds {self._garbage_collect_interval_seconds} "
f"--death-timeout-seconds {self._death_timeout_seconds} "
f"--trim-memory-threshold-bytes {self._trim_memory_threshold_bytes} "
f"--event-loop {self._event_loop} "
f"--worker-io-threads {self._io_threads}"
)
if self._hard_processor_suspend:
command += " --hard-processor-suspend"
if self._object_storage_address:
command += f" --object-storage-address {self._object_storage_address.to_string()}"
if format_capabilities(self._capabilities).strip():
command += f" --per-worker-capabilities {format_capabilities(self._capabilities)}"
resp = self._ecs_client.run_task(
cluster=self._ecs_cluster,
taskDefinition=self._ecs_task_definition,
launchType="FARGATE",
overrides={
"containerOverrides": [
{
"name": "scaler-container",
"environment": [
{"name": "COMMAND", "value": command},
{"name": "PYTHON_REQUIREMENTS", "value": self._ecs_python_requirements},
{"name": "PYTHON_VERSION", "value": self._ecs_python_version},
],
}
]
},
networkConfiguration={"awsvpcConfiguration": {"subnets": self._ecs_subnets, "assignPublicIp": "ENABLED"}},
)
failures = resp.get("failures") or []
if failures:
raise RuntimeError(f"ECS run task failed: {failures}")
tasks = resp.get("tasks") or []
if not tasks:
raise RuntimeError("ECS run task returned no tasks")
if len(tasks) > 1:
raise RuntimeError("ECS run task returned multiple tasks, expected only one")
task_arn = tasks[0]["taskArn"]
worker_group_id = f"ecs-{uuid.uuid4().hex}".encode()
self._worker_groups[worker_group_id] = WorkerGroupInfo(
worker_ids={WorkerID.generate_worker_id(worker_name) for worker_name in worker_names}, task_arn=task_arn
)
return worker_group_id, Status.Success
async def shutdown_worker_group(self, worker_group_id: WorkerGroupID) -> Status:
if not worker_group_id:
return Status.WorkerGroupIDNotSpecified
if worker_group_id not in self._worker_groups:
logging.warning(f"Worker group with ID {bytes(worker_group_id).decode()} does not exist.")
return Status.WorkerGroupIDNotFound
resp = self._ecs_client.stop_task(
cluster=self._ecs_cluster,
task=self._worker_groups[worker_group_id].task_arn,
reason="Shutdown requested by ecs adapter",
)
failures = resp.get("failures") or []
if failures:
logging.error(f"ECS stop task failed: {failures}")
return Status.UnknownAction
self._worker_groups.pop(worker_group_id)
return Status.Success
Tips
Match allocation and scaling policies: Use
allocate=capabilitywithscaling=capabilityto ensure tasks are routed to workers with the right capabilities.Set appropriate thresholds: Adjust task ratio thresholds based on your task duration and scaling latency:
Short tasks: Use higher
upper_task_ratioto avoid thrashingLong startup time: Use higher
lower_task_ratioto avoid premature scale-down
Monitor scaling events: Use Scaler’s monitoring tools (
scaler_top) to observe scaling behavior and tune policies.Worker Adapter Placement: Run worker adapters on machines that can provision the required resources (e.g., run the ECS adapter where it has AWS credentials, run the native adapter on the target machine).