11.5 Testando tudo junto

Agora que já sabemos como treinar e efetuar o deploy do modelo via Notebook, vamos criar um pipeline para automatizar o processo.

11.4.1 Testando tudo junto fase 1

O Primeiro passo é separar de forma lógia os steps.

  • Preparação dos dados

  • Treinamento do modelo

  • Deploy

Assim como fizemos no capítulo 10.3 vamos usar o notebook parar criar o pipeline para preparar os dados, treinar o modelo e efetuar o deploy.

Neste primeiro pipeline vamos criar algo simples com basicamente o código que vimos no capítulo 10.4. Essa primeira função é a fase de preparação de dados. Ela baixa os dados do minio e prepara os dados para a fase de treinamento:

def get_data():
    import pandas as pd
    from io import BytesIO
    from minio import Minio
    import nltk
    from nltk.corpus import stopwords

    MINIO_HOST="minio-service.kubeflow:9000"
    MINIO_ACCESS_KEY="minio"
    MINIO_SECRET_KEY="minio123"
    MINIO_MODEL_BUCKET="kserve"
    MINIO_DATA_PRODUCT_PATH="data/produtos.csv"
    MINIO_DATA_PREP_PATH="data/prep.csv"
    INCOME_MODEL_PATH="sklearn/income/model"

    minioClient = Minio(MINIO_HOST,
                    access_key=MINIO_ACCESS_KEY,
                    secret_key=MINIO_SECRET_KEY,
                    secure=False)
    obj = minioClient.get_object(
        MINIO_MODEL_BUCKET,
        MINIO_DATA_PRODUCT_PATH,
    )
    nltk.download('stopwords')
    products_data = pd.read_csv(obj,delimiter=';', encoding='utf-8')
    products_data.count()
    # concatenando as colunas nome e descricao
    products_data['informacao'] = products_data['nome'] + products_data['descricao']
    # excluindo linhas com valor de informacao ou categoria NaN
    products_data.dropna(subset=['informacao', 'categoria'], inplace=True)
    products_data.drop(columns=['nome', 'descricao'], inplace=True)

    stop_words=set(stopwords.words("portuguese"))
    # transforma a string em caixa baixa e remove stopwords
    products_data['sem_stopwords'] = products_data['informacao'].str.lower().apply(lambda x: ' '.join([word for word in x.split() if word not in (stop_words)]))
    tokenizer = nltk.RegexpTokenizer(r"\w+")
    products_data['tokens'] = products_data['sem_stopwords'].apply(tokenizer.tokenize) # aplica o regex tokenizer
    products_data.drop(columns=['sem_stopwords','informacao'],inplace=True) # Exclui as colunas antigas

    products_data["strings"]= products_data["tokens"].str.join(" ") # reunindo cada elemento da lista
    products_data.head()

    csv = products_data.to_csv(sep=';').encode('utf-8')
    minioClient.put_object(
        MINIO_MODEL_BUCKET,
        MINIO_DATA_PREP_PATH,
        data=BytesIO(csv),
        length=len(csv),
        content_type='application/csv'
    )

Depois vamos criar uma função para o treinamento de modelo:

def training():
    import pandas as pd
    import minio
    import nltk
    from nltk.corpus import stopwords    
    from sklearn.feature_extraction.text import CountVectorizer
    from sklearn.naive_bayes import MultinomialNB
    from sklearn.model_selection import train_test_split
    from sklearn.pipeline import Pipeline
    from minio import Minio
    import joblib

    MINIO_HOST="minio-service.kubeflow:9000"
    MINIO_ACCESS_KEY="minio"
    MINIO_SECRET_KEY="minio123"
    MINIO_MODEL_BUCKET="kserve"
    MINIO_DATA_PRODUCT_PATH="data/produtos.csv"
    MINIO_DATA_PREP_PATH="data/prep.csv"
    INCOME_MODEL_PATH="sklearn/income/model"
    nltk.download('stopwords')
    minioClient = Minio(MINIO_HOST,
                    access_key=MINIO_ACCESS_KEY,
                    secret_key=MINIO_SECRET_KEY,
                    secure=False)
    obj = minioClient.get_object(
        MINIO_MODEL_BUCKET,
        MINIO_DATA_PREP_PATH,
    )
    products_data = pd.read_csv(obj,delimiter=';', encoding='utf-8')
    X_train,X_test,y_train,y_test = train_test_split( # Separação dos dados para teste e treino
        products_data["strings"], 
        products_data["categoria"], 
        test_size = 0.2, 
        random_state = 10
    )
    pipe = Pipeline([('vetorizador', CountVectorizer()), ("classificador", MultinomialNB())]) # novo
    pipe.fit(X_train, y_train)
    joblib.dump(pipe, "model.joblib")
    print(minioClient.fput_object(MINIO_MODEL_BUCKET, f"{INCOME_MODEL_PATH}/model.joblib", 'model.joblib'))

E por fim efetuar o depoy do modelo:

def deploy():
    from kubernetes import client 
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1SKLearnSpec

    namespace = utils.get_default_target_namespace()
    MINIO_HOST="minio-service.kubeflow:9000"
    MINIO_ACCESS_KEY="minio"
    MINIO_SECRET_KEY="minio123"
    MINIO_MODEL_BUCKET="kserve"
    MINIO_DATA_PRODUCT_PATH="data/produtos.csv"
    MINIO_DATA_PREP_PATH="data/produtos.csv"
    INCOME_MODEL_PATH="sklearn/income/model"
    DEPLOY_NAMESPACE="kubeflow-user-example-com"

    name='sklearn-produtos'
    kserve_version='v1beta1'
    api_version = constants.KSERVE_GROUP + '/' + kserve_version

    predictor = V1beta1PredictorSpec(
        service_account_name='kserve-sa',
        min_replicas=1,
        sklearn=V1beta1SKLearnSpec(
            storage_uri='s3://'+MINIO_MODEL_BUCKET+'/'+ INCOME_MODEL_PATH,
            resources=client.V1ResourceRequirements(
                requests={"cpu": "300m", "memory": "128Mi"},
                limits={"cpu": "500m", "memory": "512Mi"},
            ),
        ),
    )
    isvc = V1beta1InferenceService(api_version=api_version,
                                kind=constants.KSERVE_KIND,
                                metadata=client.V1ObjectMeta(
                                    name=name, 
                                    namespace=DEPLOY_NAMESPACE
                                ),
                                spec=V1beta1InferenceServiceSpec(
                                predictor=predictor
                                )
                                )
    KServe = KServeClient()
    KServe.replace(name, isvc) 
    #KServe.create(isvc) 

Agora precisamos transformar as funções em componentes para usarmos dentro do pipeline do Kubeflow. Para isso vamos utilizar o componente create_component_from_func:

import kfp
import kfp.dsl as dsl

get_data_step_comp = kfp.dsl.component(
func=get_data,
    base_image='python:3.11',
    packages_to_install=['minio', 'nltk==3.7', 'scikit-learn==1.1.3', 'joblib==1.1.0', 'pandas==2.1.4'])

create_step_training_comp = kfp.dsl.component(
    func=training,
    base_image='python:3.11',
    packages_to_install=['minio', 'nltk==3.7', 'scikit-learn==1.1.3', 'joblib==1.1.0', 'pandas==2.1.4'])

create_step_deploy_comp = kfp.dsl.component(
    func=deploy,
    base_image='python:3.11',
    packages_to_install=['kserve==0.13.0'])

@dsl.pipeline(
   name='Pipeline de produtos',
   description='Data prep, training and deploy'
)
def pipeline_produtos():
    task_data = get_data_step_comp()
    task_training = create_step_training_comp().after(task_data)
    task_deploy = create_step_deploy_comp().after(task_training)

Veja que cada componente tem as próprias dependências de pacotes. O pipeline orquestra a chamada dos containiers de forma sequencial. Execute o seguinte comando para criar o pipeline e acompanhe na WebUI o resultado:

client = kfp.Client()
client.create_run_from_pipeline_func(pipeline_produtos, namespace=namespace, arguments={}, experiment_name="pipeline_produtos")

Testando tudo junto fase 2

Agora que já vimos como o pipeline pode ajudar no processo de deploy de Machine Learning, vamos utilizar alguns componentes do Kubeflow pipeline para facilitar o processo.

apiVersion: v1
kind: LimitRange
metadata:
  name: mem-cpu-limit-range
spec:
  limits:
  - default:
      memory: 256Mi
      cpu: 500m
    defaultRequest:
      memory: 64Mi
      cpu: 100m
    type: Container

Vamos substituir a etapa de deploy de modelo pelo componente Keserve.

from kfp import components
kserve_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/2.0.5/components/kserve/component.yaml')

@dsl.pipeline(
   name='Pipeline de produtos',
   description='Data prep, training and deploy'
)
def pipeline_produtos():
    task_data = get_data_step_comp()
    task_training = create_step_training_comp().after(task_data)
    MINIO_MODEL_BUCKET="kserve"
    INCOME_MODEL_PATH="sklearn/income/model"
    DEPLOY_NAMESPACE=namespace
    MODEL_URI='s3://'+MINIO_MODEL_BUCKET+'/'+ INCOME_MODEL_PATH
    NAME='sklearn-produtos'
    SERVICE_ACCOUT='kserve-sa'
    task_deploy = kserve_op(
        action='apply',
        model_name=NAME,
        namespace=DEPLOY_NAMESPACE,
        model_uri=MODEL_URI,
        framework='sklearn',
        service_account=SERVICE_ACCOUT
    ).after(task_training)

Execute o seguinte comando para criar o pipeline e acompanhe na WebUI o resultado:

client = kfp.Client()
client.create_run_from_pipeline_func(pipeline_produtos, namespace=namespace, arguments={}, experiment_name="pipeline_produtos2")

Veja e explore a documentação do componente Keserve. Existem parâmetros para controlar a ação de deploy e opções de rollout de deploy.

Explore outros componentes do Kubeflow pipeline como o (Metrics)[https://www.kubeflow.org/docs/components/pipelines/sdk/pipelines-metrics/].

Last updated