Component I/O

Use parameter/artifact inputs and outputs

Components may accept inputs and create outputs. Inputs and outputs can be one of two types: parameters or artifacts. The following matrix describes possible component inputs and outputs:

Parameter Artifact
Input Input Parameter Input Artifact
Output Output Parameter Output Artifact

Throughout the remainder of this section, we will use the following example dataset creation pipeline to understand the behavior and usage of input and output parameters and artifacts:

from kfp import dsl
from kfp.dsl import Input, Output, Dataset


@dsl.container_component
def create_dataset(
    initial_text: str,
    output_dataset: Output[Dataset],
):
    """Create a dataset containing the string `initial_text`.""" 
    return dsl.ContainerSpec(
        image='alpine',
        command=['sh', '-c', 'mkdir --parents $(dirname "$1") && echo "$0" > "$1"',],
        args=[initial_text, output_dataset.path])


@dsl.component
def augment_dataset(
    existing_dataset: Input[Dataset],
    resulting_dataset: Output[Dataset],
    text: str,
    num: int = 10,
) -> int:
    """Append `text` `num` times to an existing dataset, then write it as a new dataset."""
    additional_data = ' '.join(text for _ in range(num))

    with open(existing_dataset.path, 'r') as f:
        existing_dataset_text = f.read()

    resulting_dataset_text = existing_dataset_text + ' ' + additional_data

    with open(resulting_dataset.path, 'w') as f:
        f.write(resulting_dataset_text)

    return len(resulting_dataset_text)


@dsl.pipeline()
def my_pipeline(initial_text: str = 'initial dataset text'):
    create_task = create_dataset(initial_text=initial_text)
    augment_dataset(
        existing_dataset=create_task.outputs['output_dataset'],
        text='additional text')

This pipeline uses a custom container component create_dataset to construct an initial Dataset artifact containing initial_text. Then, the downstream lightweight Python component augment_dataset appends text repeated num times to the dataset and saves it as a new dataset.

Inputs

Component inputs are specified by the component function’s signature. This applies for all authoring approaches: lightweight Python components, containerized Python components, and custom container components.

Ultimately, each authoring style creates a component definitied by an image, command, and args. When you use an input, it is represented as a placeholder in the command or args and is interpolated at component runtime.

There is one additional type of input, the struct PipelineTaskFinalStatus, which allows access to the metadata of one task from within another via a system-provided value at runtime. This input is a special case, as it is neither a typical parameter nor an artifact and it is only usable in dsl.ExitHandler exit tasks. Use of this input is covered in Authoring: Pipelines.

Input parameters

Input parameters are declared when you use a str, int, float, bool, dict or list type annotation. The data passed to parameters typed with dict or list may only container JSON-serializable Python primitives. Union types are not permitted.

In the example create_dataset component, initial_text is an input parameter. In augment_dataset, text and num are input parameters.

Input parameters may have default values. For example, augment_dataset’s num parameter has a default value of 10.

Within a component function body, use input parameters just as you would in a normal Python function.

Input artifacts

Input artifacts are defined when you use an Input[<ArtifactClass>] annotation. For more information about artifacts, see Component I/O.

At component runtime, input artifacts are copied to the local filesystem by the executing backend. This abstracts away the need for the component author to know where artifacts are stored in remote storage and allows component authors to only interact with the local filesystem when implementing a component that uses an artifact. All artifacts implement a .path method, which can be used to access the local path where the artifact file has been copied.

Let’s see how this works in practice. In our example pipeline, augment_dataset specifies the input existing_dataset: Input[Dataset]. In the pipeline definition, we pass the output dataset from create_dataset to this parameter. When the augument_dataset component runs, the executing backend copies the output_dataset artifact file to the container filesystem and passes in an instance of Dataset as an argument to existing_dataset. The Dataset instance has a .path handle to its location in the container filesystem, allowing the component to read it:

with open(existing_dataset.path, 'r') as f:
    existing_dataset_text = f.read()

Outputs

Like inputs, component outputs are also specified by the component function’s signature. Depending on the component authoring approach and the type of output (parameter or artifact), outputs may be specified by the function return type annotation (e.g., -> int), the type annotation generic Output[], or the type annotation class OutputPath. Uses for each are explained in the sections to follow.

For all output types and authoring styles, outputs from a component are persisted to a remote file store, such as Minio, Google Cloud Storage, or AWS S3, that way they outlast the ephemeral container that creates them and can be picked up for use by a downstream task.

Output parameters

Output parameters are declared in different ways depending on the authoring approach.

Python components

For lightweight Python components and containerized Python components, output parameters are declared by the Python component function return type annotation (e.g., -> int). Like parameter inputs, return type annotations may be str, int, float, bool, dict or list.

In our example, augment_dataset has a one integer output.

You may also specify multiple output parameters by using these annotations within a typing.NamedTuple as follows:

from typing import NamedTuple
from kfp import dsl

@dsl.component
def my_component() -> NamedTuple('Outputs', [('name', str), ('id', int)]):
    from typing import NamedTuple

    output = NamedTuple('Outputs', [('name', str), ('id', int)])
    return output('my_dataset', 123)

Custom container components

For custom container components, output parameters are declared via an OutputPath annotation, which is a class that takes a type as its only argument (e.g., OutputPath(int)). At runtime, the backend will pass a filepath string to parameters with this annotation. This string indicating where in the container filesystem the component should write this parameter output. The backend will copy the file specified by this path to remote storage after component execution.

While the lightweight component executor handles writing the output parameters to the correct local filepath, custom container component authors must implement this in the container logic.

For example, the following very simple create_text_output_parameter component creates the output parameter string "some text" by using an OutputPath(str) annotation and writing the parameter to the path in the variable output_string_path:

from kfp import dsl
from kfp.dsl import OutputPath

@dsl.container_component
def create_text_output_parameter(output_string_path: OutputPath(str)):
    return dsl.ContainerSpec(
        image='alpine',
        command=[
            'sh', '-c',
            'mkdir --parents $(dirname "$0") && echo "some text" > "$0"'
        ],
        args=[output_string])

Output artifacts

Output artifacts are declared when you use an Output[<ArtifactClass>] annotation. For more information about artifacts, see [Artifacts][artifacts].

Output artifacts are treated inversely to input artifacts at component runtime: instead of being copied to the container from remote storage, they are copied to remote storage from the .path location in the container’s filesystem after the component executes. This abstracts away the need for the component author to know where artifacts are stored in remote storage and allows component authors to only interact with the local filesystem when implementing a component that creates an artifact. As with using an artifact input, component authors should write artifacts to .path:

with open(resulting_dataset.path, 'w') as f:
    f.write(resulting_dataset_text)

Pipeline I/O

A pipeline may be used like a component by instantiating it as a task within another pipeline.

Inputs

All pipeline inputs must include type annotations. Valid input parameter annotations include str, int, float, bool, dict, list. Input parameters may also have defaults. The only valid input artifact annotation is Input[<Artifact>] (where <Artifact> is any KFP-compatible artifact class). Input artifacts may not have defaults.

The following simple pipeline has a str parameter text and an int parameter number. number has a default value of 10.

from kfp import dsl

@dsl.pipeline
def my_pipeline(text: str, number: int = 10):
    ...

Ultimately, all inputs must be passed to an inner “primitive” component in order to perform computation on the input. See Passing data between tasks: From a pipeline input for information about how to pass data from a pipeline input to a component within the pipeline.

Outputs

Pipelines may also have output parameters. All outputs are specified by a normal Python function return type annotation indicated by the -> token (e.g., -> int). Valid parameter type annotations include str, int, float, bool, dict, and list. Valid artifact type return annotations include <Artifact> (where <Artifact> is a KFP-compatible artifact class). You may specify multiple outputs using a typing.NamedTuple return annotation (see Python Components) for more information on how to use named tuple return types.

Ultimately, all outputs must be created by an inner “primitive” component. Pipelines may return this output as its output.

For example, the following double pipeline returns the single int output of the multiply component:

from kfp import dsl

@dsl.component
def multiply(a: int, b: int) -> int:
    return a * b

@dsl.pipeline
def double(number: int) -> int:
    return multiply(num1=a, num2=2).output

In the following different example, the training_workflow pipeline returns a Model from the inner train_model component:

from kfp import dsl
from kfp.dsl import Dataset, Input, Model, Output

@dsl.component
def train_model(dataset: Input[Dataset], model: Output[Model]):
    # do training
    trained_model = ...
    trained_model.save(model.path)

@dsl.pipeline
def training_workflow() -> Model:
    get_dataset_op = get_dataset()
    train_model_op = train_model(dataset=get_dataset_op.outputs['dataset'])
    return train_model_op.outputs['model']

Passing data between tasks

To instantiate a component as a task, you must pass to it any required inputs. Required inputs include all input parameters without default values and all input artifacts.

Output parameters (e.g., OutputPath) and output artifacts (e.g., Output[<ArtifactClass>]) should not be passed explicitly by the pipeline author; they will be passed at component runtime by the executing backend. This allows component internals to know where output parameters and artifacts should be written in the container filesystem in order to be copied to remote storage by the backend.

Task inputs may come from one of three different places: a static variable, a pipeline parameter, or an upstream task output. Let’s walk through each, using the following identity component to help illustrate each approach:

@dsl.component
def identity(x: int) -> int:
    return x

From a static variable

To provide static data as an input to a component, simply pass it as you would when using a normal function:

@dsl.pipeline()
def my_pipeline():
    task = identity(x=10)

Note: Input artifacts cannot be passed as static variables; they must always be passed from an upstream task or an importer component.

From a pipeline input

To pass data from a pipeline input to an inner task, simply pass the variable name as you normally would when calling one function within another:

@dsl.pipeline()
def my_pipeline(pipeline_var_x: int):
    task = identity(x=pipeline_var_x)

From a task output

Tasks provide references to their outputs in order to support passing data between tasks in a pipeline.

In nearly all cases, outputs are accessed via .outputs['<parameter>'], where '<parameter>' is the parameter name or named tuple field name from the task that produced the output which you wish to access. The .outputs['<parameter>'] access pattern is used to access Output[] artifacts, OutputPath output parameters, and NamedTuple output parameters.

The only exception to this access pattern is when you wish to access a single return value from a lightweight Python component, which can be accessed through the task’s .output attribute.

The following two subsections demonstrate this for parameters then artifacts.

Passing parameters from task to task

Let’s introduce two more components for sake of demonstrating passing parameters between components:

from typing import NamedTuple

@dsl.component
def named_tuple(an_id: int) -> NamedTuple('Outputs', [('name', str), ('id', int)]):
    """Lightweight Python component with a NamedTuple output."""
    from typing import NamedTuple
    outputs = NamedTuple('Outputs', [('name', str), ('id', int)])
    return outputs('my_dataset', an_id)

@dsl.container_component
def identity_container(integer: int, output_int: OutputPath(int)):
    """Custom container component that creates an integer output parameter."""
    return dsl.ContainerSpec(
        image='alpine',
        command=[
            'sh', '-c',
            'mkdir --parents $(dirname "$0") && echo "$1" > "$0"'
        ],
        args=[output_int, integer])

Using the new named_tuple and identity_container components with our original identity component, the following pipeline shows the full range of task-to-task data passing styles:

@dsl.pipeline()
def my_pipeline(pipeline_parameter_id: int):
    named_tuple_task = named_tuple(an_id=pipeline_parameter_id)
    
    # access a named tuple parameter output via .outputs['<parameter>']
    identity_container_task = identity_container(integer=named_tuple_task.outputs['id'])
    
    # access an OutputPath parameter output via .outputs['<parameter>']
    identity_task_1 = identity(x=identity_container_task.outputs['output_int'])
    
    # access a lightweight component return value via .output
    identity_task_2 = identity(x=identity_task_1.output)

Passing artifacts from task to task

Artifacts may only be annotated via Input[<ArtifactClass>]/Output[<ArtifactClass>] annotations and may only be accessed via the .outputs['<parameter>'] syntax. This makes passing them between tasks somewhat simpler than for parameters.

The pipeline below demonstrates passing an artifact between tasks using an artifact producer and an artifact consumer:

from kfp import dsl
from kfp.dsl import Artifact, Input

@dsl.component
def producer(output_artifact: Output[Artifact]):
    with open(output_artifact, 'w') as f:
        f.write('my artifact')

@dsl.component
def consumer(input_artifact: Input[Artifact]):
    with open(input_artifact, 'r') as f:
        print(f.read())

@dsl.pipeline()
def my_pipeline():
    producer_task = producer()
    consumer(input_artifact=producer_task.outputs['output_artifact'])

Special input values

There are a few special input values that may be used to access pipeline or task metadata within a component. These values can passed to input parameters typed with str. For example, the following print_op component can obtain the pipeline job name at component runtime by using the dsl.PIPELINE_JOB_NAME_PLACEHOLDER:

from kfp dsl

@dsl.pipeline()
def my_pipeline():
    print_op(text=dsl.PIPELINE_JOB_NAME_PLACEHOLDER)

There several placeholders that may be used in this style, including:

  • dsl.PIPELINE_JOB_NAME_PLACEHOLDER
  • dsl.PIPELINE_JOB_RESOURCE_NAME_PLACEHOLDER
  • dsl.PIPELINE_JOB_ID_PLACEHOLDER
  • dsl.PIPELINE_TASK_NAME_PLACEHOLDER
  • dsl.PIPELINE_TASK_ID_PLACEHOLDER

Placeholders

In general, each of the three component authoring styles handle the injection of placeholders into your container command and args, allowing the component author to not have to worry about them. However, there are two types of placeholders you may wish to use directly: ConcatPlaceholder and IfPresentPlaceholder. These placeholders may only be used when authoring custom container components via the @dsl.container_component decorator.

ConcatPlaceholder

When you provide a container command or container args as a list of strings, each element in the list is concatenated using a space separator, then issued to the container. Concatenating an one input to another string without a space separator requires special handling provided by the ConcatPlaceholder.

ConcatPlaceholder takes one argument, items which may be a list of any combination of static strings, parameter inputs, or other instances of ConcatPlaceholder or IfPresentPlaceholder. At runtime, these strings will be concatenated together without a separator.

For example, you can use ConcatPlaceholder to concatenate a file path prefix, suffix, and extension:

from kfp import dsl

@dsl.container_component
def concatenator(prefix: str, suffix: str):
    return dsl.ContainerSpec(
        image='alpine',
        command=[
            'my_program.sh'
        ],
        args=['--input', dsl.ConcatPlaceholder([prefix, suffix, '.txt'])]
    )

IfPresentPlaceholder

IfPresentPlaceholder is used to conditionally provide command line arguments. The IfPresentPlaceholder takes three arguments: input_name, then, and optionally else_. This placeholder is easiest to understand through an example:

@dsl.container_component
def hello_someone(optional_name: str = None):
    return dsl.ContainerSpec(
        image='python:3.7',
        command=[
            'echo', 'hello',
            dsl.IfPresentPlaceholder(
                input_name='optional_name', then=[optional_name])
        ])

If the hello_someone component is passed 'world' as an argument for optional_name, the component will print hello world. If not, it will only print hello.

The third parameter else_ can be used to provide a default value to fall back to if input_name is not provided.

Arguments to then and else_ may be a list of any combination of static strings, parameter inputs, or other instances of ConcatPlaceholder or IfPresentPlaceholder.

Component interfaces and type checking

The KFP SDK compiler has the ability to use the type annotations you provide to type check your pipeline definition for mismatches between input and output types. The type checking logic is simple yet handy, particularly for complex pipelines. The type checking logic is:

  • Parameter outputs may only be passed to parameter inputs. Artifact outputs may only be passed to artifact inputs.
  • A parameter output type (int, str, etc.) must match the annotation of the parameter input to which it is passed.
  • An artifact output type (Dataset, Model, etc.) must match the artifact input type to which it is passed or either of the two artifact annotations must use the generic KFP Artifact class.

Feedback

Was this page helpful?


Last modified September 15, 2022: Pipelines v2 content: KFP SDK (#3346) (3f6a118c)