Special Case: Importer Components

Import artifacts from outside your pipeline

Importer Component Usage

Unlike the other three authoring approaches, an importer component is not a general authoring style but a pre-baked component for a specific use case: loading a machine learning artifact from a URI into the current pipeline and, as a result, into ML Metadata. This section assumes basic familiarity with KFP artifacts.

As described in Pipeline Basics, inputs to a task are typically outputs of an upstream task. When this is the case, artifacts are easily accessed on the upstream task using my_task.outputs['<output-key>']. The artifact is also registered in ML Metadata when it is created by the upstream task.

If you wish to use an existing artifact that was not generated by a task in the current pipeline, you can use a dsl.importer component to load the artifact from its URI.

You do not need to write an importer component; it can be imported from the dsl module and used directly:

from kfp import dsl

@dsl.pipeline
def my_pipeline():
    task = get_date_string()
    importer_task = dsl.importer(
        artifact_uri='gs://ml-pipeline-playground/shakespeare1.txt',
        artifact_class=dsl.Dataset,
        reimport=True,
        metadata={'date': task.output})
    other_component(dataset=importer_task.output)

In addition to an artifact_uri argument, you must provide an artifact_class argument to specify the type of the artifact.

Importing Model Artifacts from Container Images

Starting in Kubeflow Pipelines 2.5, you can import model artifacts that are packaged as container images using the modelcar format. Here’s how it works:

  1. Specifying the URI:

    • Use an OCI URI in the artifact_uri argument
    • Example: For a container image quay.io/my-org/my-model:v1, use artifact_uri='oci://quay.io/my-org/my-model:v1'
  2. Runtime Behavior:

    • When a component uses the imported model, the modelcar runs as a sidecar container in the pod
    • The model’s path property points to the /models directory in the running modelcar
  3. Handling Same User Requirements:

    • The component container and modelcar container must run with the same user/UID
    • If the users/UIDs don’t match, accessing the model’s path property will fail with a permission error
    • No action is needed for Kubernetes distributions with random user assignment to pods (such as OpenShift, for example)
    • For other cases, you have two options:
      • Build the modelcar container with the same UID as your component container image
      • Set the PIPELINE_RUN_AS_USER environment variable to a nonroot UID on the Kubeflow Pipelines API server deployment, which will ensure consistent UIDs on all pods created by Kubeflow Pipelines

Setting Metadata and Controlling Imports

The importer component permits setting artifact metadata via the metadata argument. Metadata can be constructed with outputs from upstream tasks, as is done for the 'date' value in the example pipeline.

You may also specify a boolean reimport argument. If reimport is False, KFP will check to see if the artifact has already been imported to ML Metadata and, if so, use it. This is useful for avoiding duplicative artifact entries in ML Metadata when multiple pipeline runs import the same artifact. If reimport is True, KFP will reimport the artifact as a new artifact in ML Metadata regardless of whether it was previously imported.

Download Imported Artifacts to a Pipeline Run Workspace

⚠️ Version Requirement: The Pipeline Run Workspace feature is available starting from Kubeflow Pipelines version 2.15.0.

Set parameter download_to_workspace=True to download an imported artifact directly to a Pipeline Run Workspace, shown below.

from kfp import dsl
from kfp.dsl import Output, importer

@dsl.component()
def train(dataset: dsl.Input[dsl.Dataset]):
    """Dummy Training step."""
    with open(dataset.path, encoding="utf-8") as f:
        data = f.read()

@dsl.component()
def write_file_artifact(out_ds: Output[dsl.Dataset]):
    import os
    os.makedirs(os.path.dirname(out_ds.path), exist_ok=True)
    with open(out_ds.path, "w", encoding="utf-8") as f:
        f.write("Hello from producer file\n")

@dsl.pipeline()
def import_stage(file_uri: str):
    """Nested stage that imports by URI and runs consumers."""
    importer1 = importer(
        artifact_uri=file_uri,
        artifact_class=dsl.Dataset,
        download_to_workspace=True,
    )

    return train(dataset=importer1.output)

@dsl.pipeline(
    pipeline_config=dsl.PipelineConfig(
        workspace=dsl.WorkspaceConfig(
            size='1Gi',
            kubernetes=dsl.KubernetesWorkspaceConfig(
                pvcSpecPatch={'storageClassName': 'standard','accessModes': ['ReadWriteOnce']}
            ),
        ),
    ),
)
def pipeline_with_importer_workspace():
    # Produce a file artifact and compute its runtime URI
    file_writer = write_file_artifact()

    # Import and consume inside a nested sub-pipeline
    stage = import_stage(file_uri=file_writer.outputs["out_ds"].uri)

Feedback

Was this page helpful?