11.3 Kubeflow Pipeline

Kubeflow pipeline é uma plataforma para workflows de Machine Learning baseado em containers. Os pipelines são desenvolvidos com a linguagem Python e traduzidos em objetos dentro do Kubernetes. Revise os conceitos do Pipeline para melhor entendimento.

Vamos fazer um exemplo ponta a ponta agora com o Kubeflow pipelines.

11.3.1 Configuração do Notebook

Antes de iniciar o exemplo precisamos criar uma configuração para que os notebooks do Kubeflow acesses o Kubeflow pipelines:

apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
  name: access-ml-pipeline
  namespace: "kubeflow-user-example-com"
spec:
  desc: Allow access to Kubeflow Pipelines
  selector:
    matchLabels:
      access-ml-pipeline: "true"
  volumes:
    - name: volume-kf-pipeline-token
      projected:
        sources:
          - serviceAccountToken:
              path: token
              expirationSeconds: 7200
              audience: pipelines.kubeflow.org      
  volumeMounts:
    - mountPath: /var/run/secrets/kubeflow/pipelines
      name: volume-kf-pipeline-token
      readOnly: true
  env:
    - name: KF_PIPELINES_SA_TOKEN_PATH
      value: /var/run/secrets/kubeflow/pipelines/token

Crie um arquivo yaml como o exemplo e crie os recursos com o kubectl. Agora podemos acessar o notebook do Kubeflow e usar a configuração para acessar o Kubeflow pipeline.

Através da WebUi do Kubeflow vamos criar um Notebook Jupyter e executar o primeiro pipeline.

Vamos usar a namespace kubeflow-user-example-com criada na configuração do Kuubeflow e instalar as bibliotecas Python para manipular o pipeline via notebook. Utilize a documentação oficial para criar o notebook Jupyter.

No notebook Jupyter execute a seguinte linha para criar uma variável para utilizarmos na namespace correta:

namespace="kubeflow-user-example-com"

11.3.2 Primeiros Pipelines

Agora vamos criar o primeiro pipeline:

import kfp
from kfp import dsl

@dsl.component
def echo_op():
    print("Hello world step 1")

@dsl.pipeline(
    name='my-first-pipeline',
    description='A hello world pipeline.'
)
def hello_world_pipeline():
    echo_task = echo_op()

kfp_client=kfp.Client()
run_id = kfp_client.create_run_from_pipeline_func(hello_world_pipeline,namespace=namespace, arguments={}, experiment_name="hello_word")

Verifique na WebUi do Kubeflow a execução do pipeline. Repare que utilizamos o componente create_component_from_func, que transforma uma função python em um componente do Kubeflow pipeline que é utilizado pelo pipeline hello_world_pipeline.

Agora vamos criar um exemplo de steps sequenciais. Este exemplo ele baixa um arquivo txt do cloud storage e em seguida faz o print do resultado. Para que os steps sejam sequenciais é necessário que a saída de um step seja a entrada do seguinte ou podemos declarar a dependencia com a função *after:

@dsl.container_component
def gcs_download_op(url: str):
    return dsl.ContainerSpec(
        image='google/cloud-sdk:279.0.0',
        command=['sh', '-c'],
        args=['gsutil cat $0 | tee $1', url, '/tmp/results.txt']
    )
@dsl.container_component
def echo_op(text: str):
    return dsl.ContainerSpec(
        image='library/bash:4.4.23',
        command=['sh', '-c'],
        args=['echo "$0"', text]
    )

@dsl.pipeline(
    name='sequential-pipeline',
    description='A pipeline with two sequential steps.'
)
def sequential_pipeline(url : str='gs://ml-pipeline/sample-data/shakespeare/shakespeare1.txt'):
    """A pipeline with two sequential steps."""
  download_task = gcs_download_op(url=url)
  echo_task = echo_op(text='/tmp/results.txt').after(download_task)
  echo_task2 = echo_op(text='/tmp/results.txt').after(echo_task)

kfp_client=kfp.Client()
run_id = kfp_client.create_run_from_pipeline_func(sequential_pipeline,namespace=namespace, arguments={}, experiment_name="sequential_pipeline")

Após a execução deste pipeline veja a execução na WebUI do Kubeflow.

Existem muitos exemplos de pipeline no repositório do Kubeflow. Teste outros exemplos como o Data passing in python components.

Last updated