PyTorch Guide

How to run PyTorch on Kubernetes with Kubeflow Trainer

This guide describes how to use TrainJob to train or fine-tune AI models with PyTorch.

Prerequisites

Before exploring this guide, make sure to follow the Getting Started guide to understand the basics of Kubeflow Trainer.

PyTorch Distributed Overview

PyTorch has the builtin torch.distributed package to perform distributed training, including both data and model parallelism. You can use the Kubeflow Python SDK to create your TrainJobs with PyTorch Distributed Data Parallel (DDP), Fully Sharded Data Parallel (FSDP), FSDP2, or any other parallelism algorithm supported by PyTorch.

In DDP training, the dataset is sharded across multiple GPUs, with each GPU holding one partition of the dataset and a full copy of the model. The gradients are calculated locally on each GPU and then synchronized globally to update the model parameters.

In FSDP training, in addition to DDP, the model gets chopped into slices and assigned to the different GPUs. The model is split into shards, each hosted on a different GPU. Gradients and parameter updates are computed locally and synchronized globally. FSDP is particularly useful for training very large models that cannot fit into the memory of a single GPU.

Get PyTorch Runtime Packages

Kubeflow Trainer includes a PyTorch runtime called torch-distributed, which comes with several pre-installed Python packages.

Run the following command to get a list of the available packages:

from kubeflow.trainer import TrainerClient, Runtime, CustomTrainer
import time

job_id = TrainerClient().train(
    runtime=TrainerClient().get_runtime("torch-distributed"),
)

while True:
    if TrainerClient().get_job(name=job_id).status == "Succeeded":
        break
    time.sleep(1)

print(TrainerClient().get_job_logs(name=job_id)["node-0"])

You should see the installed packages, for example:

Torch Distributed Runtime
--------------------------------------
Torch Default Runtime Env
Package                   Version
------------------------- ------------
...
torch                     2.7.1+cu128
torchaudio                2.7.1+cu128
torchelastic              0.2.2
torchvision               0.22.1+cu128

PyTorch Distributed Environment

Kubeflow Trainer uses the torchrun utility to run PyTorch script on every training node. It automatically configures the appropriate distributed environment for PyTorch nodes:

  • dist.get_world_size() - Total number of processes (e.g., GPUs) across all PyTorch nodes.
  • dist.get_rank() - Rank of the current process across all PyTorch node.
  • os.environ["LOCAL_RANK"] - Rank of the current process within a single PyTorch training node.

You can use these values to, for example, download the dataset only on the node with local_rank=0, or export your fine-tuned LLM only on the node with rank=0 (e.g., the master node).

You can access the distributed environment as follows:

from kubeflow.trainer import TrainerClient, CustomTrainer

def get_torch_dist():
    import os
    import torch
    import torch.distributed as dist

    device, backend = ("cuda", "nccl") if torch.cuda.is_available() else ("cpu", "gloo")
    dist.init_process_group(backend=backend)

    print("PyTorch Distributed Environment")
    print(f"Using device: {device}")
    print(f"WORLD_SIZE: {dist.get_world_size()}")
    print(f"RANK: {dist.get_rank()}")
    print(f"LOCAL_RANK: {os.environ['LOCAL_RANK']}")


job_id = TrainerClient().train(
    runtime=TrainerClient().get_runtime("torch-distributed"),
    trainer=CustomTrainer(
        func=get_torch_dist,
        num_nodes=3,
        resources_per_node={
            "cpu": 2,
        },
    ),
)


while True:
    if TrainerClient().get_job(name=job_id).status == "Succeeded":
        break
    time.sleep(1)


print("Distributed PyTorch env on node-0")
print(TrainerClient().get_job_logs(name=job_id, node_rank=0)["node-0"])

print("Distributed PyTorch env on node-1")
print(TrainerClient().get_job_logs(name=job_id, node_rank=1)["node-1"])

You should see the distributed environment across the two training nodes as follows:

Distributed PyTorch env on node-0
PyTorch Distributed Environment
Using device: cpu
WORLD_SIZE: 6
RANK: 0
LOCAL_RANK: 0
PyTorch Distributed Environment
Using device: cpu
WORLD_SIZE: 6
RANK: 1
LOCAL_RANK: 1

Distributed PyTorch env on Node-1
PyTorch Distributed Environment
Using device: cpu
WORLD_SIZE: 6
RANK: 2
LOCAL_RANK: 0
PyTorch Distributed Environment
Using device: cpu
WORLD_SIZE: 6
RANK: 3
LOCAL_RANK: 1

Create TrainJob with PyTorch Training

Configure PyTorch Training Function

You can leverage the CustomTrainer() to wrap your PyTorch code inside a function and create a TrainJob. This function should handle the end-to-end model training or fine-tuning of a pre-trained model.

Your training function might look like this:

def fine_tune_qwen():
    import torch
    import torch.distributed as dist
    from torch.utils.data import DataLoader, DistributedSampler
    from transformers import AutoTokenizer, AutoModelForCausalLM
    import boto3

    # Setup distributed Torch.
    device, backend = ("cuda", "nccl") if torch.cuda.is_available() else ("cpu", "gloo")
    dist.init_process_group(backend=backend)

    # Configure the dataset and dataloader.
    dataset = ...
    train_loader = DataLoader(
        dataset, batch_size=128, sampler=DistributedSampler(dataset)
    )
    # Configure the pre-trained model and tokenizer.
    tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen3-32B")
    model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen3-32B")

    # Configure the PyTorch training loop.
    for epoch in range(10):
        for batch_idx, batch in enumerate(train_loader):
            output = model(...)
            model.backward(output.loss)
            model.step()
            ...
    if dist.get_rank() == 0:
        # Export your model to the object storage (e.g. S3)
        boto3.upload_file()

Create a TrainJob

After configuring the PyTorch training function, use the train() API to create TrainJob:

job_id = TrainerClient().train(
    runtime=TrainerClient().get_runtime("torch-distributed"),
    trainer=CustomTrainer(
        func=fine_tune_qwen,
        num_nodes=2,
        resources_per_node={
            "gpu": 4
        },
        # These packages will be installed on every training node.
        packages_to_install=["transformers>=4.53.0", "boto3"],
    )
)

Get the TrainJob Results

You can use the get_job_logs() API to see your TrainJob logs:

print(TrainerClient().get_job_logs(name=job_id)["node-0"])

Next Steps

Feedback

Was this page helpful?