Build Reusable Components

A detailed tutorial on creating components that you can use in various pipelines

This page describes how to author a reusable component that you can load and run in Kubeflow Pipelines. A reusable component is a pre-implemented standalone component that is easy to add as a step in any pipeline.

If you’re new to pipelines, see the conceptual guides to pipelines and components.

Summary

Below is a summary of the steps involved in creating and using a component:

  1. Write the program that contains your component’s logic. The program must use specific methods to pass data to and from the component.
  2. Containerize the program.
  3. Write a component specification in YAML format that describes the component for the Kubeflow Pipelines system.
  4. Use the Kubeflow Pipelines SDK to load and run the component in your pipeline.

The rest of this page gives some explanation about input and output data, followed by detailed descriptions of the above steps.

Passing the data to and from the containerized program

When planning to write a component you need to think about how the component communicates with upstream and downstream components. That is, how it consumes input data and produces output data.

Summary

For small pieces of data (smaller than 512 kibibyte (KiB)):

  • Inputs: Read the value from a command-line argument.
  • Outputs: Write the value to a local file, using a path provided as a command-line argument.

For bigger pieces of data (larger than 512 KiB) or for a storage-specific component:

  • Inputs: Read the data URI from a file provided as a command-line argument. Then read the data from that URI.
  • Outputs: Upload the data to the URI provided as a command-line argument. Then write that URI to a local file, using a path provided as a command-line argument.

More about input data

There are several ways to make input data available to a program running inside a container:

  • Small pieces of data (smaller than 512 kibibyte (KiB)): Pass the data content as a command-line argument:
  program.py --param 100
  • Bigger data (larger than 512 KiB): Kubeflow Pipelines doesn’t provide a way of transferring larger pieces of data to the container running the program. Instead, the program (or the wrapper script) should receive data URIs instead of the data itself and then access the data from the URIs. For example:
  program.py --train-uri [https://server.edu/datasets/1/train.tsv](https://server.edu/datasets/1/train.tsv) \
             --eval-uri [https://server.edu/datasets/1/eval.tsv](https://server.edu/datasets/1/train.tsv)
  program.py --train-gcs-uri gs://bucket/datasets/1/train.tsv  
  program.py --big-query-table my_table

More about output data

The program must write the output data to some location and inform the system about that location so that the system can pass the data between steps. You should provide the paths to your output data as command-line arguments. That is, you should not hardcode the paths.

You can choose a suitable storage solution for your output data. Options include the following:

  • Google Cloud Storage is the recommended default storage solution for writing output.
  • For structured data you can use BigQuery. You must provide the specific URI/path or table name to which to write the results.

The program should do the following:

  • Upload the data to your chosen storage system.
  • Pass out a URI pointing to the data, by writing that URI to a file and instructing the system to pick it up and treat it as the value of a particular component output.

Note that the example below accepts both a URI for uploading the data into, and a file path to write that URI to.

program.py --out-model-uri gs://bucket/163/output_model \
           --out-model-uri-file /outputs/output_model_uri/data

Why should the program output the URI it has just received as an input argument? The reason is that the URIs specified in the pipeline are usually not the real URIs, but rather URI templates containing UIDs. The system resolves the URIs at runtime when the containerized program starts. Only the containerized program sees the fully-resolved URI.

Below is an example of such a URI:

gs://my-bucket/{{workflow.uid}}/{{pod.id}}/data

In cases where the program cannot control the URI/ID of the created object (for example, where the URI is generated by the outside system), the program should just accept the file path to write the resulting URI/ID:

program.py --out-model-uri-file /outputs/output_model_uri/data

Future-proofing your code

The following guidelines help you avoid the need to modify the program code in the near future or have different versions for different storage systems.

If the program has access to the TensorFlow package, you can use tf.gfile to read and write files. The tf.gfile module supports both local and Cloud Storage paths.

If you cannot use tf.gfile, a solution is to read inputs from and write outputs to local files, then add a storage-specific wrapper that downloads and uploads the data from/to a specific storage solution such as Cloud Storage or Amazon S3. For example, create a wrapper script that uses the gsutil cp command to download the input data before running the main program and to upload the output data after the program finishes.

Writing the program code

This section describes an example program that has two inputs (for small and large pieces of data) and one output. The programming language in this example is Python 3.

program.py

#!/usr/bin/env python3
import argparse
import os
from pathlib import Path
from tensorflow import gfile # Supports both local paths and Cloud Storage (GCS) or S3

# Function doing the actual work
def do_work(input1_file, output1_file, param1):
  for x in range(param1):
    line = next(input1_file)
    if not line:
      break
    _ = output1_file.write(line)

# Defining and parsing the command-line arguments
parser = argparse.ArgumentParser(description='My program description')
parser.add_argument('--input1-path', type=str, help='Path of the local file or GCS blob containing the Input 1 data.')
parser.add_argument('--param1', type=int, default=100, help='Parameter 1.')
parser.add_argument('--output1-path', type=str, help='Path of the local file or GCS blob where the Output 1 data should be written.')
parser.add_argument('--output1-path-file', type=str, help='Path of the local file where the Output 1 URI data should be written.')
args = parser.parse_args()

gfile.MakeDirs(os.path.dirname(args.output1_path))
# Opening the input/output files and performing the actual work
with gfile.Open(args.input1_path, 'r') as input1_file, gfile.Open(args.output1_path, 'w') as output1_file:
    do_work(input1_file, output1_file, args.param1)

# Writing args.output1_path to a file so that it will be passed to downstream tasks
Path(args.output1_path_file).parent.mkdir(parents=True, exist_ok=True)
Path(args.output1_path_file).write_text(args.output1_path)

The command line invocation of this program is:

python3 program.py --input1-path <URI to Input 1 data> \
                   --param1 <value of Param1 input> --output1-path <URI for Output 1 data> \
                   --output1-path-file <local file path for the Output 1 URI>

You need to pass the URI for Output 1 data forward so that the downstream steps can access the UI. The program write the URI to a local file and tells the system to grab it and expose it as an output. You should avoid hard-coding any paths, so the program receives the path to the local file path through the --output1-path-file command-line argument.

Writing a Dockerfile to containerize your application

You need a Docker container image that packages your program.

The instructions on creating container images are not specific to Kubeflow Pipelines. To make things easier for you, this section provides some guidelines on standard container creation. You can use any procedure of your choice to create the Docker containers.

Your Dockerfile must contain all program code, including the wrapper, and the dependencies (operating system packages, Python packages etc).

Ensure you have write access to a container registry where you can push the container image. Examples include Google Container Registry and Docker Hub.

Think of a name for your container image. This guide uses the name `gcr.io/my-org/my-image’.

Example Dockerfile

ARG BASE_IMAGE_TAG=1.12.0-py3
FROM tensorflow/tensorflow:$BASE_IMAGE_TAG
RUN python3 -m pip install keras
COPY ./src /pipelines/component/src

Create a build_image.sh script (see example below) to build the container image based on the Dockerfile and push the container image to some container repository.

Run the build_image.sh script to build the container image based on the Dockerfile and push it to your chosen container repository.

Best practice: After pushing the image, get the strict image name with digest, and use the strict image name for reproducibility.

Example build_image.sh:

#!/bin/bash -e
image_name=gcr.io/my-org/my-image # Specify the image name here
image_tag=latest
full_image_name=${image_name}:${image_tag}
base_image_tag=1.12.0-py3

cd "$(dirname "$0")" 
docker build --build-arg BASE_IMAGE_TAG=${base_image_tag} -t "${full_image_name}" .
docker push "$full_image_name"

# Output the strict image name (which contains the sha256 image digest)
docker inspect --format="{{index .RepoDigests 0}}" "${IMAGE_NAME}"

Make your script executable:

chmod +x build_image.sh

Writing your component definition file

You need a component specification in YAML format that describes the component for the Kubeflow Pipelines system.

For the complete definition of a Kubeflow Pipelines component, see the component specification. However, for this tutorial you don’t need to know the full schema of the component specification. The tutorial provides enough information for the relevant the components.

Start writing the component definition (component.yaml) by specifying your container image in the component’s implementation section:

implementation:
  container:
    image: gcr.io/my-org/my-image@sha256:a172..752f # Name of a container image that you've pushed to a container repo.

Complete the component’s implementation section based on your (wrapper) program:

implementation:
  container:
    image: gcr.io/my-org/my-image@sha256:a172..752f
    # command is a list of strings (command-line arguments). 
    # The YAML language has two syntaxes for lists and you can use either of them. 
    # Here we use the "flow syntax" - comma-separated strings inside square brackets.
    command: [
      python3, /kfp/component/src/program.py, # Path of the program inside the container
      --input1-path, <URI to Input 1 data>,
      --param1, <value of Param1 input>,
      --output1-path, <URI template for Output 1 data>,
      --output1-path-file, <local file path for the Output 1 URI>,
    ]

The command section still contains some dummy placeholders (in angle brackets). Let’s replace them with real placeholders. A placeholder represents a command-line argument that is replaced with some value or path before the program is executed. In component.yaml, you specify the placeholders using YAML’s mapping syntax to distinguish them from the verbatim strings. There are three placeholders available:

  • {inputValue: Some input name}
    This placeholder is replaced by the value of the argument to the specified input. This is useful for small pieces of input data.
  • {outputPath: Some output name}
    This placeholder is replaced by the auto-generated local path where the program should write its output data. This instructs the system to read the content of the file and store it as the value of the specified output.

As well as putting real placeholders in the command line, you need to add corresponding input and output specifications to the inputs and outputs sections. The input/output specification contains the input name, type, description and default value. Only the name is required. The input and output names are free-form strings, but be careful with the YAML syntax and use quotes if necessary. The input/output names do not need to be the same as the command-line flags which are usually quite short.

Replace the placeholders as follows:

  • Replace <URI to Input 1 file> with {inputValue: Input 1 URI} and add Input 1 URI to the inputs section. URLs are small, so we’re passing them in as command-line arguments.
  • Replace <value of Param1 input> with {inputValue: Parameter 1} and add Parameter 1 to the inputs section. Integers are small, so we’re passing them in as command-line arguments.
  • Replace <URI template for Output 1 file> with {inputValue: Output 1 URI template} and add Output 1 URI template to the inputs section. This looks very confusing: you’re adding an output URI into the inputs section. The reason is that currently you must manually pass in URIs, so this is input, not output.
  • Replace <local file path for the Output 1 URI> with {outputPath: Output 1 URI} and add Output 1 URI to the outputs section. Again, this looks quite confusing: you now have both input and output called Output 1 URI. (Note that you can use different names.) The reason is that the URI is pass through. It’s passed to the task as input and is then output from the task, so that downstream tasks have access to it.

After replacing the placeholders and adding inputs/outputs, your component.yaml looks like this:

inputs: #List of input specs. Each input spec is a map.
- {name: Input 1 URI}
- {name: Parameter 1}
- {name: Output 1 URI template}
outputs:
- {name: Output 1 URI}
implementation:
  container:
    image: gcr.io/my-org/my-image@sha256:a172..752f
    command: [
      python3, /pipelines/component/src/program.py,
      --input1-path,
      {inputValue: Input 1 URI}, # Refers to the "Input 1 URI" input
      --param1,
      {inputValue: Parameter 1}, # Refers to the "Parameter 1" input
      --output1-path,
      {inputValue: Output 1 URI template}, # Refers to "Output 1 URI template" input
      --output1-path-file,
      {outputPath: Output 1 URI}, # Refers to the "Output 1 URI" output
    ]

The above component specification is sufficient, but you should add more metadata to make it more useful. The example below includes the following additions:

  • Component name and description.
  • For each input and output: description, default value, and type.

Final version of component.yaml:

name: Do dummy work
description: Performs some dummy work.
inputs:
- {name: Input 1 URI, type: GCSPath, description='GCS path to Input 1'}
- {name: Parameter 1, type: Integer, default='100', description='Parameter 1 description'} # The default values must be specified as YAML strings.
- {name: Output 1 URI template, type: GCSPath, description='GCS path template for Output 1'}
outputs:
- {name: Output 1 URI, type: GCSPath, description='GCS path for Output 1'}
implementation:
  container:
    image: gcr.io/my-org/my-image@sha256:a172..752f
    command: [
      python3, /pipelines/component/src/program.py,
      --input1-path,       {inputValue: Input 1 URI},
      --param1,            {inputValue: Parameter 1},
      --output1-path,      {inputValue: Output 1 URI template},
      --output1-path-file, {outputPath: Output 1 URI},
    ]

Build your component into a pipeline with the Kubeflow Pipelines SDK

Here is a sample pipeline that shows how to load a component and use it to compose a pipeline

import kfp
# Load the component by calling load_component_from_file or load_component_from_url
# To load the component, the pipeline author only needs to have access to the component.yaml file.
# The Kubernetes cluster executing the pipeline needs access to the container image specified in the component.
dummy_op = kfp.components.load_component_from_file(os.path.join(component_root, 'component.yaml')) 
# dummy_op = kfp.components.load_component_from_url('http://....../component.yaml')

# dummy_op is now a "factory function" that accepts the arguments for the component's inputs
# and produces a task object (e.g. ContainerOp instance).
# Inspect the dummy_op function in Jupyter Notebook by typing "dummy_op(" and pressing Shift+Tab
# You can also get help by writing help(dummy_op) or dummy_op? or dummy_op??
# The signature of the dummy_op function corresponds to the inputs section of the component.
# Some tweaks are performed to make the signature valid and pythonic:
# 1) All inputs with default values will come after the inputs without default values
# 2) The input names are converted to pythonic names (spaces and symbols replaced
#    with underscores and letters lowercased).

# Define a pipeline and create a task from a component:
@kfp.dsl.pipeline(name='My pipeline', description='')
def my_pipeline():
    dummy1_task = dummy_op(
        # Input name "Input 1 URI" is converted to pythonic parameter name "input_1_uri"
        input_1_uri='gs://my-bucket/datasets/train.tsv',
        parameter_1='100',
        # You must use Argo placeholders ("{{workflow.uid}}" and "{{pod.name}}")
        # to guarantee that the outputs from different pipeline runs and tasks write
        # to unique locations and do not overwrite each other.
        output_1_uri='gs://my-bucket/{{workflow.uid}}/{{pod.name}}/output_1/data',
    ).apply(kfp.gcp.use_gcp_secret('user-gcp-sa')) 
    # To access GCS, you must configure the container to have access to a
    # GCS secret that grants required access to the bucket.
    # The outputs of the dummy1_task can be referenced using the
    # dummy1_task.outputs dictionary.
    # ! The output names are converted to lowercased dashed names.

    # Pass the outputs of the dummy1_task to some other component
    dummy2_task = dummy_op(
        input_1_uri=dummy1_task.outputs['output-1-uri'],
        parameter_1='200',
        output_1_uri='gs://my-bucket/{{workflow.uid}}/{{pod.name}}/output_1/data',
    ).apply(kfp.gcp.use_gcp_secret('user-gcp-sa')) 
    # To access GCS, you must configure the container to have access to a
    # GCS secret that grants required access to the bucket.
# This pipeline can be compiled, uploaded and submitted for execution.

Organizing the component files

This section provides a recommended way to organize the component files. There is no requirement that you must organize the files in this way. However, using the standard organization makes it possible to reuse the same scripts for testing, image building and component versioning.
See this sample component for an real-life component example.

components/<component group>/<component name>/

    src/*            #Component source code files
    tests/*          #Unit tests
    run_tests.sh     #Small script that runs the tests
    README.md        #Documentation. Move to docs/ if multiple files needed

    Dockerfile       #Dockerfile to build the component container image
    build_image.sh   #Small script that runs docker build and docker push

    component.yaml   #Component definition in YAML format

Next steps