Skip to main content

Kubernetes Scheduling

Kubernetes schedules pods independently by default (see Kubernetes Scheduler). When an experiment is submitted, each pod is placed on a node as soon as the resources it requests are available. There is no default mechanism to hold pods until the full set can start together (although this can be enabled through Gang scheduling). Whether this matters depends on the workload.

Asynchronous scheduling

For embarrassingly parallel and sequential workloads, pods starting at different times causes no problems.

Embarrassingly parallel: each job is independent. If only 4 of 16 GPUs are free, 4 trial pods start immediately and the rest queue. Each pod does useful work from the moment it starts.

Sequential batch processing: a single job with completions > 1 and parallelisms < completions. Pods run one after another anyway. Staggered scheduling simply means the queue drains as resources free up rather than all at once.

See the JobSet examples for concrete manifests of both patterns.

Synchronous workloads need all pods ready

Some workloads require the full set of pods to be present before any useful work can begin:

  • Custom distributed workloads that coordinate via barriers or rendezvous points
  • Multi-node jobs where all workers must connect to each other or to a coordinator before starting

When pods start at different times in these cases, early pods block waiting for their peers and hold their resource allocation (including GPUs) while idle.

Gang scheduling

Gang scheduling solves the synchronous scheduling problem by treating a group of pods as an atomic unit: either all pods are scheduled together, or none are. Pods wait in a queue until all required resources are simultaneously available, then start together.

Gang scheduling requires a cluster-level scheduler plugin.

Gang scheduling is not available on AIchor-managed engines. For imported engines, availability depends on whether the underlying cluster has a compatible scheduler plugin installed. Contact the cluster administrator to confirm.

Handling synchronization without gang scheduling (e.g. Jobset runtime)

If gang scheduling is not available, pods can probe peer hostnames via DNS and wait until all are reachable before starting distributed work. Early pods spin idle (holding their resource allocation) for the duration of the wait.

JobSet creates one headless Service per JobSet and sets each pod's hostname to {jobset-name}-{group}-{job-index}-{completion-index} with the subdomain defaulting to the JobSet name. This makes every peer's DNS address predictable at runtime from the injected environment variables.

Two coordination patterns are available:

Decentralized (every pod waits for all others)

Suitable for homogeneous groups with no designated coordinator. Each pod independently probes all its peers.

import os
import socket
import time


def wait_for_peers(timeout: int = 300) -> None:
hostname = socket.gethostname()
job_index = int(os.environ["JOB_INDEX"])
completion_index = int(os.environ["JOB_COMPLETION_INDEX"])
group_name = os.environ["REPLICATED_JOB_NAME"]
replicas = int(os.environ["REPLICATED_JOB_REPLICAS"])

# hostname = "{jobset-name}-{group}-{job-index}-{completion-index}"
base = hostname.rsplit("-", 2)[0] # "{jobset-name}-{group}"
jobset_name = base[: -(len(group_name) + 1)] # "{jobset-name}"

peers = [
f"{base}-{i}-{completion_index}.{jobset_name}"
for i in range(replicas)
if i != job_index
]

print(f"[peers] I am {hostname} (job_index={job_index}); waiting for {len(peers)} peers", flush=True)

deadline = time.time() + timeout
remaining = set(peers)
while remaining and time.time() < deadline:
for host in list(remaining):
try:
socket.getaddrinfo(host, None)
remaining.discard(host)
print(f"[peers] {host} ready", flush=True)
except socket.gaierror:
pass
if remaining:
time.sleep(5)

if remaining:
raise RuntimeError(f"Timed out waiting for peers: {remaining}")


wait_for_peers()
# all peers are up, proceed with distributed work
print("[peers] all peers up, starting work", flush=True)
time.sleep(5) # stand-in for the real distributed workload
print("[peers] done", flush=True)

Coordinator (workers wait for coordinator, coordinator waits for all workers)

Suitable for heterogeneous groups with a dedicated coordinator or master. Workers check a single hostname; the coordinator checks all workers.

Both roles can ship in a single script and one manifest command: the pod reads REPLICATED_JOB_NAME and dispatches to the worker or coordinator path.

import os
import socket
import time

COORDINATOR_GROUP = "coordinator"
WORKER_GROUP = "worker"


def wait_for_coordinator(coordinator_group: str = COORDINATOR_GROUP, timeout: int = 300) -> None:
hostname = socket.gethostname()
job_index = int(os.environ["JOB_INDEX"])
completion_index = int(os.environ["JOB_COMPLETION_INDEX"])
group_name = os.environ["REPLICATED_JOB_NAME"]

base = hostname.rsplit("-", 2)[0]
jobset_name = base[: -(len(group_name) + 1)]

# coordinator is always job 0, pod 0 of its group
coordinator_host = f"{jobset_name}-{coordinator_group}-0-0.{jobset_name}"

print(f"[worker] I am {hostname}; waiting for coordinator {coordinator_host}", flush=True)

deadline = time.time() + timeout
while time.time() < deadline:
try:
socket.getaddrinfo(coordinator_host, None)
print(f"[worker] coordinator ready", flush=True)
return
except socket.gaierror:
time.sleep(5)

raise RuntimeError(f"Timed out waiting for coordinator: {coordinator_host}")


def wait_for_workers(worker_group: str = WORKER_GROUP, timeout: int = 300) -> None:
hostname = socket.gethostname()
job_index = int(os.environ["JOB_INDEX"])
completion_index = int(os.environ["JOB_COMPLETION_INDEX"])
group_name = os.environ["REPLICATED_JOB_NAME"]
global_replicas = int(os.environ["GLOBAL_REPLICAS"])

base = hostname.rsplit("-", 2)[0]
jobset_name = base[: -(len(group_name) + 1)]

# GLOBAL_REPLICAS counts all jobs across all groups; subtract the coordinator
worker_replicas = global_replicas - 1

workers = [
f"{jobset_name}-{worker_group}-{i}-0.{jobset_name}"
for i in range(worker_replicas)
]

print(f"[coordinator] I am {hostname}; waiting for {len(workers)} workers", flush=True)

deadline = time.time() + timeout
remaining = set(workers)
while remaining and time.time() < deadline:
for host in list(remaining):
try:
socket.getaddrinfo(host, None)
remaining.discard(host)
print(f"[coordinator] {host} ready", flush=True)
except socket.gaierror:
pass
if remaining:
time.sleep(5)

if remaining:
raise RuntimeError(f"Timed out waiting for workers: {remaining}")


# Dispatch on the JobSet group this pod belongs to. Both roles run from the
# same image and command; REPLICATED_JOB_NAME tells each pod which it is.
group = os.environ["REPLICATED_JOB_NAME"]
if group == COORDINATOR_GROUP:
wait_for_workers()
print("[coordinator] all workers up, begin coordination", flush=True)
else:
wait_for_coordinator()
print("[worker] coordinator is up, proceed", flush=True)

The worker_replicas = global_replicas - 1 calculation assumes one coordinator job. For setups with multiple groups, pass the exact worker count explicitly rather than deriving it from GLOBAL_REPLICAS.

Recovering from eviction

A pod is evicted when the platform terminates it before the workload finishes, most commonly when a spot instance is reclaimed, but also when a node runs out of memory or disk, becomes unhealthy, or is drained for maintenance. The jobset and kuberay operators support automatic recovery when this happens. The number of allowed restarts is set via spec.restartPolicy.backoffLimit:

spec:
operator: "jobset" # or kuberay
restartPolicy:
backoffLimit: 5

In the snippet above, spec.restartPolicy.backoffLimit is the number of allowed restarts: this experiment can handle 5 failures (including evictions) before being marked as failed.