One of the benefits of KFP is cross-platform portability. The KFP SDK compiles pipeline definitions to IR YAML which can be read and executed by different backends, including the Kubeflow Pipelines open source backend and Vertex AI Pipelines.
For cases where features are not portable across platforms, users may author pipelines with platform-specific functionality via KFP SDK platform-specific plugin libraries.
In general, platform-specific plugin libraries provide functions that act on tasks similarly to task-level configuration methods provided by the KFP SDK directly. Platform-specific plugin libraries may also provide pre-baked components.
Example: Read/write to a Kubernetes PVC using kfp-kubernetes
Currently the only KFP SDK platform-specific plugin library is
kfp-kubernetes, which is supported by the Kubeflow Pipelines open source backend and enables direct access to some Kubernetes resources and functionality.
The following uses
kfp-kubernetes to demonstrate typical usage of a plugin library. Specifically, we will use
kfp-kubernetes to create a PersistentVolumeClaim (PVC), use the PVC to pass data between tasks, and delete the PVC after using it. See the
kfp-kubernetes documentation for more information.
Step 1: Install the platform-specific plugin library with the KFP SDK
pip install kfp[kubernetes]
Step 2: Create components that read/write to the mount path
Create two simple components that read and write to a file. In a later step, we will mount the associated volume to the
from kfp import dsl @dsl.component def producer() -> str: with open('/data/file.txt', 'w') as file: file.write('Hello world') with open('/data/file.txt', 'r') as file: content = file.read() print(content) return content @dsl.component def consumer() -> str: with open('/data/file.txt', 'r') as file: content = file.read() print(content) return content
Step 3: Dynamically provision a PVC using CreatePVC
Now that we have our components, we can begin constructing a pipeline. First, we need a PVC to mount. We’ll use the
kubernetes.CreatePVC pre-baked component to dynamically provision a PVC.
@dsl.pipeline def my_pipeline(): pvc1 = kubernetes.CreatePVC( # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC pvc_name_suffix='-my-pvc', access_modes=['ReadWriteMany'], size='5Gi', storage_class_name='standard', )
This component provisions a 5GB PVC from the StorageClass
'standard' with the
ReadWriteMany access mode. The PVC will be named after the underlying Argo workflow that creates it, concatenated with the suffix
CreatePVC component returns this name as the output
Step 4: Read a write data to the PVC
Next, we’ll use the
mount_pvc task modifier with the
consumer components. We’ll also schedule
task2 to run after
task1 to prevent the components from writing and reading to the PVC at the same time.
# write to the PVC task1 = producer() kubernetes.mount_pvc( task1, pvc_name=pvc1.outputs['name'], mount_path='/data', ) # read to the PVC task2 = consumer() kubernetes.mount_pvc( task2, pvc_name=pvc1.outputs['name'], mount_path='/reused_data', ) task2.after(task1)
Step 5: Delete the PVC
Finally, we can schedule deletion of the PVC after
task2 finishes to clean up the Kubernetes resources we created.
delete_pvc1 = kubernetes.DeletePVC( pvc_name=pvc1.outputs['name']).after(task2)