While components have three authoring approaches, pipelines have one authoring approach: they are defined with a pipeline function decorated with the
@dsl.pipeline decorator. Take the following pipeline,
pythagorean, which implements the Pythagorean theorem as a pipeline via simple arithmetic components:
from kfp import dsl @dsl.component def square(x: float) -> float: return x ** 2 @dsl.component def add(x: float, y: float) -> float: return x + y @dsl.component def square_root(x: float) -> float: return x ** .5 @dsl.pipeline def pythagorean(a: float, b: float) -> float: a_sq_task = square(x=a) b_sq_task = square(x=b) sum_task = add(x=a_sq_task.output, y=b_sq_task.output) return square_root(x=sum_task.output).output
Although a KFP pipeline decoratored with the
@dsl.pipeline decorator looks like a normal Python function, it is actually an expression of pipeline topology and control flow semantics, constructed using the KFP domain-specific language (DSL).
A pipeline definition has four parts:
- The pipeline decorator
- Inputs and outputs declared in the function signature
- Data passing and task dependencies
- Task configurations
- Pipeline control flow
This section covers the first four parts. Control flow is covered in the next section.
The pipeline decorator
KFP pipelines are defined inside functions decorated with the
@dsl.pipeline decorator. The decorator takes three optional arguments:
nameis the name of your pipeline. If not provided, the name defaults to a sanitized version of the pipeline function name.
descriptionis a description of the pipeline.
pipeline_rootis the root path of the remote storage destination within which the tasks in your pipeline will create outputs.
pipeline_rootmay also be set or overridden by pipeline submission clients.
display_nameis a human-readable for your pipeline.
You can modify the definition of
pythagorean to use these arguments:
@dsl.pipeline(name='pythagorean-theorem-pipeline', description='Solve for the length of a hypotenuse of a triangle with sides length `a` and `b`.', pipeline_root='gs://my-pipelines-bucket', display_name='Pythagorean pipeline.') def pythagorean(a: float, b: float) -> float: ...
Also see Additional Functionality: Component docstring format for information on how to provide pipeline metadata via docstrings.
Pipeline inputs and outputs
Like components, pipeline inputs and outputs are defined by the parameters and annotations in the pipeline function signature.
In the preceding example,
pythagorean accepts inputs
b, each typed
float, and creates one
Pipeline inputs are declaried via function input parameters/annotations and pipeline outputs are declared via function output annotations. Pipeline outputs will never be declared via pipeline function input parameters, unlike for components that use output artifacts or Container Components that use
For more information on how to declare pipeline function inputs and outputs, see Data Types.
Data passing and task dependencies
When you call a component in a pipeline definition, it constructs a
PipelineTask instance. You can pass data between tasks using the
For a task with a single unnamed output indicated by a single return annotation, access the output using
PipelineTask.output. This the case for the components
square_root, which each have one unnamed output.
For tasks with multiple outputs or named outputs, access the output using
PipelineTask.outputs['<output-key>']. Using named output parameters is described in more detail in Data Types: Parameters.
In the absence of data exchange, tasks will run in parallel for efficient pipeline executions. This is the case for
b_sq_task which do not exchange data.
When tasks exchange data, an execution ordering is established between those tasks. This is to ensure that upstream tasks create their outputs before downstream tasks attempt to consume those outputs. For example, in
pythagorean, the backend will execute
b_sq_task before it executes
sum_task. Similarly, it will execute
sum_task before it executes the final task created from the
In some cases, you may wish to establish execution ordering in the absence of data exchange. In these cases, you can call one task’s
.after() method on another task. For example, while
b_sq_task do not exchange data, we can specify
a_sq_task to run before
@dsl.pipeline def pythagorean(a: float, b: float) -> float: a_sq_task = square(x=a) b_sq_task = square(x=b) b_sq_task.after(a_sq_task) ...
Special input types
There are a few special input values that you can pass to a component within your pipeline definition to give the component access to some metadata about itself. These values can be passed to input parameters typed
For example, the following
print_op component prints the pipeline job name at component runtime using
from kfp import dsl @dsl.pipeline def my_pipeline(): print_op(text=dsl.PIPELINE_JOB_NAME_PLACEHOLDER)
There several special values that may be used in this style, including:
Not yet supported
PIPELINE_ROOT_PLACEHOLDER is not yet supported by the KFP orchestration backend, but may be supported by other orchestration backends. You can track support for this feature via the GitHub issue.
See the KFP SDK DSL reference docs for more information about the data provided by each special input.
The KFP SDK exposes several platform-agnostic task-level configurations via task methods. Platform-agnostic configurations are those that are expected to exhibit similar execution behavior on all KFP-conformant backends, such as the open source KFP backend or Google Cloud Vertex AI Pipelines.
All platform-agnostic task-level configurations are set using
PipelineTask methods. Take the following environment variable example:
from kfp import dsl @dsl.component def print_env_var(): import os print(os.environ.get('MY_ENV_VAR')) @dsl.pipeline() def my_pipeline(): task = print_env_var() task.set_env_variable('MY_ENV_VAR', 'hello')
When executed, the
print_env_var component should print
Task-level configuration methods can also be chained:
print_env_var().set_env_variable('MY_ENV_VAR', 'hello').set_env_variable('OTHER_VAR', 'world')
The KFP SDK provides the following task methods for setting task-level configurations:
Not yet supported
.ignore_upstream_failure is not yet supported by the KFP orchestration backend, but may be supported by other orchestration backends. You can track support for this feature via the GitHub issue.
PipelineTask reference documentation for more information about these methods.
Pipelines as components
Pipelines can themselves be used as components in other pipelines, just as you would use any other single-step component in a pipeline. For example, we could easily recompose the preceding
pythagorean pipeline to use an inner helper pipeline
from kfp import dsl @dsl.component def square(x: float) -> float: return x ** 2 @dsl.component def add(x: float, y: float) -> float: return x + y @dsl.component def square_root(x: float) -> float: return x ** .5 @dsl.pipeline def square_and_sum(a: float, b: float) -> float: a_sq_task = square(x=a) b_sq_task = square(x=b) return add(x=a_sq_task.output, y=b_sq_task.output).output @dsl.pipeline def pythagorean(a: float = 1.2, b: float = 1.2) -> float: sq_and_sum_task = square_and_sum(a=a, b=b) return square_root(x=sq_and_sum_task.output).output