Image batch filter¶
Ship a batch of synthetic RGB images to workers that each apply a CPU-heavy Sobel edge filter (numpy only); the client renders an input-vs-output grid.
[ ]:
# Connection settings -- edit these to point at your running scheduler.
SCHEDULER_ADDRESS = "ws://127.0.0.1:2345" # supports tcp:// or ws://; only ws:// works from JupyterLite (browser)
OBJECT_STORAGE_ADDRESS = None # leave None to use whatever the scheduler advertises
# Defaults: 64 large RGB images, each filtered FILTER_PASSES times so per-task work
# is measured in seconds. On 16 workers expect roughly a minute of wall-clock time.
GRID_SIDE = 8 # display GRID_SIDE x GRID_SIDE input/output thumbnails
IMAGE_SIZE = 512 # each image is IMAGE_SIZE x IMAGE_SIZE x 3 uint8 (~768 KiB)
FILTER_PASSES = 40 # number of Sobel passes per image (the kernel feeds itself)
[ ]:
import time
import numpy as np
from scaler import Client
def _synthetic_image(seed: int, size: int) -> np.ndarray:
"""Client-side: small textured RGB image; cheap to build so we never block the browser."""
rng = np.random.default_rng(seed)
base = rng.integers(0, 256, size=(size, size, 3), dtype=np.uint8)
xs = np.linspace(0, 4 * np.pi, size)
grad = (128 + 127 * np.sin(xs + seed)).astype(np.uint8)
base[..., 0] = np.clip(base[..., 0] // 2 + grad[np.newaxis, :], 0, 255)
base[..., 1] = np.clip(base[..., 1] // 2 + grad[:, np.newaxis], 0, 255)
return base
def sobel_edges(image: np.ndarray, passes: int) -> tuple[np.ndarray, float]:
"""Worker-side: apply a Sobel edge filter `passes` times, feeding each output back in."""
kx = np.array([[-1.0, 0.0, 1.0], [-2.0, 0.0, 2.0], [-1.0, 0.0, 1.0]], dtype=np.float32)
ky = kx.T
current = (0.299 * image[..., 0] + 0.587 * image[..., 1] + 0.114 * image[..., 2]).astype(np.float32)
for _ in range(passes):
gx = np.zeros_like(current)
gy = np.zeros_like(current)
for dy in range(-1, 2):
for dx in range(-1, 2):
shifted = np.roll(np.roll(current, dy, axis=0), dx, axis=1)
gx += kx[dy + 1, dx + 1] * shifted
gy += ky[dy + 1, dx + 1] * shifted
magnitude = np.hypot(gx, gy)
magnitude *= 255.0 / max(magnitude.max(), 1.0)
current = magnitude
edges = current.astype(np.uint8)
return edges, float(edges.mean())
n_images = GRID_SIDE * GRID_SIDE
inputs = [_synthetic_image(seed, IMAGE_SIZE) for seed in range(n_images)]
with Client(address=SCHEDULER_ADDRESS, object_storage_address=OBJECT_STORAGE_ADDRESS) as client:
started = time.perf_counter()
futures = [client.submit(sobel_edges, img, FILTER_PASSES) for img in inputs]
results = [f.result() for f in futures]
elapsed = time.perf_counter() - started
edges = [edge for edge, _stat in results]
mean_intensities = [stat for _edge, stat in results]
print(f"filtered {n_images} {IMAGE_SIZE}x{IMAGE_SIZE} images ({FILTER_PASSES} passes each) in {elapsed:.2f}s")
print(f"mean edge intensity per image (first 4): {[round(v, 1) for v in mean_intensities[:4]]}")
[ ]:
# Display the input/output grid. Client-side only and intentionally cheap.
import matplotlib.pyplot as plt
fig, axes = plt.subplots(GRID_SIDE, 2 * GRID_SIDE, figsize=(2 * GRID_SIDE * 1.3, GRID_SIDE * 1.3))
for idx in range(n_images):
row = idx // GRID_SIDE
col = idx % GRID_SIDE
axes[row, 2 * col].imshow(inputs[idx])
axes[row, 2 * col].set_axis_off()
axes[row, 2 * col + 1].imshow(edges[idx], cmap="gray")
axes[row, 2 * col + 1].set_axis_off()
fig.suptitle("input (left) vs. Sobel edges (right)")
fig.tight_layout()
plt.show()