Generate Python code that uses the Konduktor SDK to define and launch tasks programmatically. Use when the user asks for Python code to submit a Konduktor job, wants to use the Python API instead of YAML, or needs programmatic task creation.
You are an expert at writing Python code that uses the Konduktor SDK to create and launch tasks programmatically.
import konduktor
# 1. Create a Task
task = konduktor.Task(
name='my-task', # required
run='python train.py', # required; shell command(s)
setup='pip install -r req.txt', # optional; runs before 'run'
workdir='./my-project', # optional; local dir to sync
num_nodes=1, # optional; default 1
envs={'KEY': 'value'}, # optional
)
# 2. Specify Resources
resources = konduktor.Resources(
cpus=4, # required; vCPUs (int/float/str)
memory=16, # required; GiB (int)
accelerators='H100:8', # optional; "TYPE:COUNT" or {"TYPE": COUNT}
image_id='nvcr.io/nvidia/pytorch:24.10-py3', # required
disk_size=256, # optional; GB, default 256
labels={ # required (at minimum queue-name)
'kueue.x-k8s.io/queue-name': 'user-queue',
},
job_config={ # optional
'max_restarts': 3,
'completions': 1,
},
)
task.set_resources(resources)
# 3. Optional: File mounts
task.set_file_mounts({
'~/remote/data': './local/data',
})
# 4. Optional: Serving config
serving = konduktor.Serving(
min_replicas=1,
max_replicas=2,
ports=8000,
probe='/health',
)
task.set_serving(serving)
# 5. Optional: Environment variable updates
task.update_envs({'EXTRA_VAR': 'value'})
# 6. Launch
job_id = konduktor.launch(task)
print(f'Submitted job: {job_id}')
All setters return self so you can chain:
task = (konduktor.Task(name='my-task', run='echo hi')
.set_resources(resources)
.set_file_mounts({...})
.set_serving(serving)
.update_envs({...})
)
resources = konduktor.Resources(
cpus=4, # int, float, or str
memory=16, # int (GiB, no floats)
accelerators='H100:8', # str "TYPE:COUNT" or dict {"TYPE": COUNT}
image_id='ubuntu', # Docker image URI
disk_size=256, # int, GB
labels={...}, # dict of K8s labels
job_config={...}, # dict with max_restarts, completions
)
# Resources are immutable; use copy() for modifications
new_resources = resources.copy(cpus=8, memory=32)
serving = konduktor.Serving(
min_replicas=1, # >= 0; at least one of min/max required
max_replicas=2, # >= 1
ports=8000, # default 8000
probe='/health', # optional health endpoint
)
# Immutable; use copy() for modifications
new_serving = serving.copy(max_replicas=4)
For node-specific commands:
task = konduktor.Task(
name='distributed-train',
run=lambda node_rank, node_ips: (
f'torchrun '
f'--nproc_per_node=8 '
f'--rdzv_endpoint={node_ips[0]}:1234 '
f'--nnodes={len(node_ips)} '
f'train.py'
),
num_nodes=2,
)
Lambda rules:
node_rank (int), node_ips (List[str])# Load from YAML
task = konduktor.Task.from_yaml('task.yaml')
# Export to YAML dict
config = task.to_yaml_config()
import konduktor
task = konduktor.Task(
name='python-api-demo',
run='python train.py --epochs 2',
workdir='tests/python_api_tests',
envs={'MY_ENV': 'foo'},
)
resources = konduktor.Resources(
cpus=2,
memory=6,
image_id='docker.io/ryanattrainy/pytorch-mnist:cpu',
labels={
'kueue.x-k8s.io/queue-name': 'user-queue',
},
job_config={
'max_restarts': 3,
'completions': 1,
},
)
task.set_resources(resources)
job_id = konduktor.launch(task)
print(f'Submitted job: {job_id}')
$ARGUMENTSuser-queue as default queue unless told otherwiserun parameter over lambda unless the user needs node-specific logic