Connect the SDK to the API

Learn how to connect the Kubeflow Pipelines SDK to the API.

Overview

The Kubeflow Pipelines SDK provides a Python interface to interact with the Kubeflow Pipelines API. This guide will show you how to connect the SDK to the Pipelines API in various scenarios.

Kubeflow Platform

When running Kubeflow Pipelines as part of a multi-user Kubeflow Platform, how you authenticate the Pipelines SDK will depend on whether you are running your code inside or outside the cluster.

Kubeflow Platform - Inside the Cluster

Click to expand

A ServiceAccount token volume can be mounted to a Pod running in the same cluster as Kubeflow Pipelines. The Kubeflow Pipelines SDK can use this token to authenticate itself with the Kubeflow Pipelines API.

The following Python code will create a kfp.Client() using a ServiceAccount token for authentication:

import kfp

# by default, when run from inside a Kubernetes cluster:
#  - the token is read from the `KF_PIPELINES_SA_TOKEN_PATH` path
#  - the host is set to `http://ml-pipeline-ui.kubeflow.svc.cluster.local`
kfp_client = kfp.Client()

# test the client by listing experiments
experiments = kfp_client.list_experiments(namespace="my-profile")
print(experiments)

ServiceAccount Token Volume

To use the preceding code, you will need to run it from a Pod that has a ServiceAccount token volume mounted. You may manually add a volume and volumeMount to your PodSpec or use Kubeflow’s PodDefaults to inject the required volume.

Option 1 - manually add a volume to your PodSpec:

apiVersion: v1
kind: Pod
metadata:
  name: access-kfp-example
spec:
  containers:
  - image: hello-world:latest
    name: hello-world
    env:
      - ## this environment variable is automatically read by `kfp.Client()`
        ## this is the default value, but we show it here for clarity
        name: KF_PIPELINES_SA_TOKEN_PATH
        value: /var/run/secrets/kubeflow/pipelines/token
    volumeMounts:
      - mountPath: /var/run/secrets/kubeflow/pipelines
        name: volume-kf-pipeline-token
        readOnly: true
  volumes:
    - name: volume-kf-pipeline-token
      projected:
        sources:
          - serviceAccountToken:
              path: token
              expirationSeconds: 7200
              ## defined by the `TOKEN_REVIEW_AUDIENCE` environment variable on the `ml-pipeline` deployment
              audience: pipelines.kubeflow.org      

Option 2 - use a PodDefault to inject the volume:

apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
  name: access-ml-pipeline
  namespace: "<YOUR_USER_PROFILE_NAMESPACE>"
spec:
  desc: Allow access to Kubeflow Pipelines
  selector:
    matchLabels:
      access-ml-pipeline: "true"
  env:
    - ## this environment variable is automatically read by `kfp.Client()`
      ## this is the default value, but we show it here for clarity
      name: KF_PIPELINES_SA_TOKEN_PATH
      value: /var/run/secrets/kubeflow/pipelines/token
  volumes:
    - name: volume-kf-pipeline-token
      projected:
        sources:
          - serviceAccountToken:
              path: token
              expirationSeconds: 7200
              ## defined by the `TOKEN_REVIEW_AUDIENCE` environment variable on the `ml-pipeline` deployment
              audience: pipelines.kubeflow.org      
  volumeMounts:
    - mountPath: /var/run/secrets/kubeflow/pipelines
      name: volume-kf-pipeline-token
      readOnly: true

RBAC Authorization

The Kubeflow Pipelines API respects Kubernetes RBAC, and will check RoleBindings assigned to the ServiceAccount before allowing it to take Pipelines API actions.

For example, this RoleBinding allows Pods with the default-editor ServiceAccount in namespace-2 to manage Kubeflow Pipelines in namespace-1:

apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: allow-namespace-2-kubeflow-edit
  ## this RoleBinding is in `namespace-1`, because it grants access to `namespace-1`
  namespace: namespace-1
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: kubeflow-edit
subjects:
  - kind: ServiceAccount
    name: default-editor
    ## the ServiceAccount lives in `namespace-2`
    namespace: namespace-2

Kubeflow Platform - Outside the Cluster

Click to expand

The precise method to authenticate from outside the cluster will depend on how you deployed Kubeflow Platform. Because most distributions use Dex as their identity provider, this example will show you how to authenticate with Dex using a Python script.

You will need to make the Kubeflow Pipelines API accessible on the remote machine. If your Kubeflow Istio gateway is already exposed, skip this step and use that URL directly.

The following command will expose the istio-ingressgateway service on localhost:8080:

# TIP: svc/istio-ingressgateway may be called something else, 
#      or use different ports in your distribution
kubectl port-forward --namespace istio-system svc/istio-ingressgateway 8080:80

The following Python code defines a KFPClientManager() class that creates an authenticated kfp.Client() by interacting with Dex:

import re
from urllib.parse import urlsplit, urlencode

import kfp
import requests
import urllib3


class KFPClientManager:
    """
    A class that creates `kfp.Client` instances with Dex authentication.
    """

    def __init__(
        self,
        api_url: str,
        dex_username: str,
        dex_password: str,
        dex_auth_type: str = "local",
        skip_tls_verify: bool = False,
    ):
        """
        Initialize the KfpClient

        :param api_url: the Kubeflow Pipelines API URL
        :param skip_tls_verify: if True, skip TLS verification
        :param dex_username: the Dex username
        :param dex_password: the Dex password
        :param dex_auth_type: the auth type to use if Dex has multiple enabled, one of: ['ldap', 'local']
        """
        self._api_url = api_url
        self._skip_tls_verify = skip_tls_verify
        self._dex_username = dex_username
        self._dex_password = dex_password
        self._dex_auth_type = dex_auth_type
        self._client = None

        # disable SSL verification, if requested
        if self._skip_tls_verify:
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        # ensure `dex_default_auth_type` is valid
        if self._dex_auth_type not in ["ldap", "local"]:
            raise ValueError(
                f"Invalid `dex_auth_type` '{self._dex_auth_type}', must be one of: ['ldap', 'local']"
            )

    def _get_session_cookies(self) -> str:
        """
        Get the session cookies by authenticating against Dex
        :return: a string of session cookies in the form "key1=value1; key2=value2"
        """

        # use a persistent session (for cookies)
        s = requests.Session()

        # GET the api_url, which should redirect to Dex
        resp = s.get(
            self._api_url, allow_redirects=True, verify=not self._skip_tls_verify
        )
        if resp.status_code == 200:
            pass
        elif resp.status_code == 403:
            # if we get 403, we might be at the oauth2-proxy sign-in page
            # the default path to start the sign-in flow is `/oauth2/start?rd=<url>`
            url_obj = urlsplit(resp.url)
            url_obj = url_obj._replace(
                path="/oauth2/start", query=urlencode({"rd": url_obj.path})
            )
            resp = s.get(
                url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
            )
        else:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for GET against: {self._api_url}"
            )

        # if we were NOT redirected, then the endpoint is unsecured
        if len(resp.history) == 0:
            # no cookies are needed
            return ""

        # if we are at `../auth` path, we need to select an auth type
        url_obj = urlsplit(resp.url)
        if re.search(r"/auth$", url_obj.path):
            url_obj = url_obj._replace(
                path=re.sub(r"/auth$", f"/auth/{self._dex_auth_type}", url_obj.path)
            )

        # if we are at `../auth/xxxx/login` path, then we are at the login page
        if re.search(r"/auth/.*/login$", url_obj.path):
            dex_login_url = url_obj.geturl()
        else:
            # otherwise, we need to follow a redirect to the login page
            resp = s.get(
                url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
            )
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for GET against: {url_obj.geturl()}"
                )
            dex_login_url = resp.url

        # attempt Dex login
        resp = s.post(
            dex_login_url,
            data={"login": self._dex_username, "password": self._dex_password},
            allow_redirects=True,
            verify=not self._skip_tls_verify,
        )
        if resp.status_code != 200:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for POST against: {dex_login_url}"
            )

        # if we were NOT redirected, then the login credentials were probably invalid
        if len(resp.history) == 0:
            raise RuntimeError(
                f"Login credentials are probably invalid - "
                f"No redirect after POST to: {dex_login_url}"
            )

        # if we are at `../approval` path, we need to approve the login
        url_obj = urlsplit(resp.url)
        if re.search(r"/approval$", url_obj.path):
            dex_approval_url = url_obj.geturl()

            # approve the login
            resp = s.post(
                dex_approval_url,
                data={"approval": "approve"},
                allow_redirects=True,
                verify=not self._skip_tls_verify,
            )
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for POST against: {url_obj.geturl()}"
                )

        return "; ".join([f"{c.name}={c.value}" for c in s.cookies])

    def _create_kfp_client(self) -> kfp.Client:
        try:
            session_cookies = self._get_session_cookies()
        except Exception as ex:
            raise RuntimeError(f"Failed to get Dex session cookies") from ex

        # monkey patch the kfp.Client to support disabling SSL verification
        # kfp only added support in v2: https://github.com/kubeflow/pipelines/pull/7174
        original_load_config = kfp.Client._load_config

        def patched_load_config(client_self, *args, **kwargs):
            config = original_load_config(client_self, *args, **kwargs)
            config.verify_ssl = not self._skip_tls_verify
            return config

        patched_kfp_client = kfp.Client
        patched_kfp_client._load_config = patched_load_config

        return patched_kfp_client(
            host=self._api_url,
            cookies=session_cookies,
        )

    def create_kfp_client(self) -> kfp.Client:
        """Get a newly authenticated Kubeflow Pipelines client."""
        return self._create_kfp_client()

The following Python code shows how to use the KFPClientManager() class to create a kfp.Client():

# initialize a KFPClientManager
kfp_client_manager = KFPClientManager(
    api_url="http://localhost:8080/pipeline",
    skip_tls_verify=True,

    dex_username="user@example.com",
    dex_password="12341234",

    # can be 'ldap' or 'local' depending on your Dex configuration
    dex_auth_type="local",
)

# get a newly authenticated KFP client
# TIP: long-lived sessions might need to get a new client when their session expires
kfp_client = kfp_client_manager.create_kfp_client()

# test the client by listing experiments
experiments = kfp_client.list_experiments(namespace="my-profile")
print(experiments)

Standalone Kubeflow Pipelines

When running Kubeflow Pipelines in standalone mode, there will be no concept of multi-user authentication or RBAC. The specific steps will depend on whether you are running your code inside or outside the cluster.

Standalone KFP - Inside the Cluster

Click to expand

When running inside the Kubernetes cluster, you may connect Pipelines SDK directly to the ml-pipeline-ui service via cluster-internal service DNS resolution.

When running in the same namespace as Kubeflow:

import kfp

client = kfp.Client(host="http://ml-pipeline-ui:80")

print(client.list_experiments())

When running in a different namespace to Kubeflow:

import kfp

# the namespace in which you deployed Kubeflow Pipelines
namespace = "kubeflow" 

client = kfp.Client(host=f"http://ml-pipeline-ui.{namespace}")

print(client.list_experiments())

Standalone KFP - Outside the Cluster

Click to expand

When running outside the Kubernetes cluster, you may connect Pipelines SDK to the ml-pipeline-ui service by using kubectl port-forwarding.

Step 1: run the following command on your external system to initiate port-forwarding:

# change `--namespace` if you deployed Kubeflow Pipelines into a different namespace
kubectl port-forward --namespace kubeflow svc/ml-pipeline-ui 3000:80

Step 2: the following code will create a kfp.Client() against your port-forwarded ml-pipeline-ui service:

import kfp

client = kfp.Client(host="http://localhost:3000")

print(client.list_experiments())

Feedback

Was this page helpful?