11.3 Kubeflow Pipeline
O Kubeflow Pipeline é uma plataforma para workflows de Machine Learning baseada em contêineres. Os pipelines são desenvolvidos com a linguagem Python e traduzidos em objetos dentro do Kubernetes. Revise os conceitos do Pipeline para melhor entendimento.
Vamos fazer um exemplo de ponta a ponta agora com o Kubeflow Pipelines.
11.3.1 Configuração do Notebook
Antes de iniciar o exemplo, precisamos criar uma configuração PodDefault
para que as execuções do SDK do KFP tenham a permissão necessária:
apiVersion: kubeflow.org/v1alpha1
kind: PodDefault
metadata:
name: access-ml-pipeline
namespace: "kubeflow-user-example-com"
spec:
desc: Allow access to Kubeflow Pipelines
selector:
matchLabels:
access-ml-pipeline: "true"
volumes:
- name: volume-kf-pipeline-token
projected:
sources:
- serviceAccountToken:
path: token
expirationSeconds: 7200
audience: pipelines.kubeflow.org
volumeMounts:
- mountPath: /var/run/secrets/kubeflow/pipelines
name: volume-kf-pipeline-token
readOnly: true
env:
- name: KF_PIPELINES_SA_TOKEN_PATH
value: /var/run/secrets/kubeflow/pipelines/token
Crie um arquivo YAML como o do exemplo e crie os recursos com o kubectl
. Agora podemos acessar o Notebook do Kubeflow e usar a configuração para acessar o Kubeflow Pipeline.
Através da WebUI do Kubeflow, vamos criar um Notebook Jupyter e executar o primeiro pipeline.
Vamos usar o namespace kubeflow-user-example-com
(criado na configuração do Kubeflow) e instalar as bibliotecas Python para manipular o pipeline via Notebook. Utilize a documentação oficial para criar o Notebook Jupyter.
No Notebook Jupyter, execute a seguinte linha para criar uma variável a ser utilizada no namespace correto:
namespace="kubeflow-user-example-com"
11.3.2 Primeiros Pipelines
Agora vamos criar o primeiro pipeline:
from kfp import dsl
from kfp import compiler
from kfp.client import Client
@dsl.component
def say_hello(name: str) -> str:
hello_text = f'Hello, {name}!'
print(hello_text)
return hello_text
@dsl.pipeline
def hello_pipeline(recipient: str) -> str:
hello_task = say_hello(name=recipient)
return hello_task.output
compiler.Compiler().compile(hello_pipeline, 'pipeline.yaml')
client = Client()
run = client.create_run_from_pipeline_package(
'pipeline.yaml',
arguments={
'recipient': 'World',
},
)
Verifique na WebUI do Kubeflow a execução do pipeline. Repare que utilizamos as funções do SDK para compilar e criar o pipeline.
Agora vamos criar um exemplo de steps (passos) sequenciais. Neste exemplo, ele baixa um arquivo TXT do Cloud Storage e, em seguida, faz o print do resultado. Para que os steps sejam sequenciais, é necessário que a saída de um step seja a entrada do seguinte, ou podemos declarar a dependência com a função after
:
@dsl.container_component
def gcs_download_op(url: str):
return dsl.ContainerSpec(
image='google/cloud-sdk:279.0.0',
command=['sh', '-c'],
args=['gsutil cat $0 | tee $1', url, '/tmp/results.txt']
)
@dsl.container_component
def echo_op(text: str):
return dsl.ContainerSpec(
image='library/bash:4.4.23',
command=['sh', '-c'],
args=['echo "$0"', text]
)
@dsl.pipeline(
name='sequential-pipeline',
description='A pipeline with two sequential steps.'
)
def sequential_pipeline(url : str='gs://ml-pipeline/sample-data/shakespeare/shakespeare1.txt'):
"""A pipeline with two sequential steps."""
download_task = gcs_download_op(url=url)
echo_task = echo_op(text='/tmp/results.txt').after(download_task)
echo_task2 = echo_op(text='/tmp/results.txt').after(echo_task)
compiler.Compiler().compile(sequential_pipeline, 'sequential_pipeline.yaml')
client = Client()
run = client.create_run_from_pipeline_package(
'sequential_pipeline.yaml',
)
Após a execução deste pipeline, veja a execução na WebUI do Kubeflow.
Existem muitos exemplos de pipeline no repositório do Kubeflow. Teste outros exemplos, como o Data passing in python components.
Last updated