Distributed Data Cache

How to use distributed data cache with Kubeflow Trainer

This guide describes how to use distributed data cache to stream data into distributed TrainJobs.

Overview

The data cache feature enables efficient data streaming for distributed training workloads by:

  • Pre-processing and caching data in a distributed Arrow cache cluster
  • Streaming data directly to training nodes without redundant preprocessing
  • Enabling scalable data access across multiple training nodes

Data cache automatically fetches data from object store and partitions it across data nodes:

Data Cache Batch

Multiple TrainJobs can access data from the cache using the Apache Arrow Flight protocol:

Data Cache TrainJob

The data cache is powered by Apache Arrow and Apache DataFusion to effectively store data in-memory with zero-copy transfer to GPU nodes.

Architecture

The training workflow consists of two stages:

  1. Dataset Initializer: Sets up a distributed cache cluster that preprocesses and serves the training data
  2. Training Nodes: Stream data from the cache and perform model training

Data Cache Init

Prerequisites

Follow these steps to install the data cache control plane.

Install Data Cache Control Plane

You need to install the following resources to use data cache:

  • Kubeflow Trainer controller manager
  • LeaderWorkerSet controller manager
  • ClusterTrainingRuntime with cache support: torch-distributed-with-cache
  • RBAC resources needed for initializer to bootstrap cache

Run the following command to install the required resources:

export VERSION=v2.1.0
kubectl apply --server-side -k "https://github.com/kubeflow/trainer.git/manifests/overlays/data-cache?ref=${VERSION}"

For the latest changes run:

kubectl apply --server-side -k "https://github.com/kubeflow/trainer.git/manifests/overlays/data-cache?ref=master"

Verify Installation

Check that runtime is installed:

$ kubectl get clustertrainingruntime

NAME                           AGE
torch-distributed-with-cache   14h

Check that RBAC is installed in your namespace:

$ kubectl get sa,rolebinding -n default | grep cache-initializer

serviceaccount/kubeflow-trainer-cache-initializer
rolebinding.rbac.authorization.k8s.io/kubeflow-trainer-cache-initializer

Prepare Your Dataset

  • Your data should be in Iceberg table format stored in S3
  • You’ll need the metadata location (S3 path to metadata.json)
  • Define a storage URI for the cache

You can use the PyIceberg library or distributed processing engine like Apache Spark to prepare your Iceberg table in S3.

Running the Example

Open the fine-tune-with-cache.ipynb Notebook and follow the steps:

  1. Install the Kubeflow Trainer SDK
  2. List available runtimes and verify torch-distributed-with-cache is available
  3. Define your training function with DataCacheDataset
  4. Create a TrainJob with DataCacheInitializer configuration
  5. Monitor the training progress and view logs

Configuration

Runtime Configuration

The torch-distributed-with-cache runtime includes:

  • Dataset Initializer Job: Deploys a cache cluster with configurable settings:

    • CACHE_IMAGE: Docker image for the Arrow cache server
  • Training Job: Connect to the cache service to stream data during distributed training.

Initializer Parameters

The example uses DataCacheInitializer initializer to bootstrap the cache cluster. You can adjust settings for your storage configuration:

DataCacheInitializer(
    storage_uri="cache://schema_name/table_name",      # Cache storage URI
    metadata_loc="s3a://bucket/path/to/metadata.json", # S3 path to Iceberg metadata
    iam_role="arn:aws:iam::123456:role/test-role"      # IAM role to access Iceberg table
    num_data_nodes=4,                                  # Number of data cache nodes.
)

You can find all available configurations for DataCacheInitializer in the Kubeflow SDK.

PyTorch Iterable Dataset

The example uses a DataCacheDataset which is subclass of the PyTorch Iterable Dataset.

This dataset:

  • Connects to the cache service via Arrow Flight protocol
  • Distributes data shards across training workers and nodes
  • Streams RecordBatches and converts them to PyTorch tensors
  • Supports custom preprocessing for your specific use case

You can extend DataCacheDataset and override from_arrow_rb_to_tensor() to customize data preprocessing for your model.

Next Steps

Feedback

Was this page helpful?