11.5 Testando tudo junto

Agora que já sabemos como treinar e efetuar o deploy do modelo via Notebook, vamos criar um pipeline para automatizar o processo.

11.5.1 Testando tudo junto: fase 1

O primeiro passo é separar de forma lógica os steps (passos):

  • Preparação dos dados

  • Treinamento do modelo

  • Deploy

Assim como fizemos no capítulo 11.3, vamos usar o Notebook para criar o pipeline que irá preparar os dados, treinar o modelo e efetuar o deploy.

Neste primeiro pipeline, vamos criar algo simples, basicamente com o código que vimos no capítulo 11.4. Essa primeira função é a fase de preparação de dados. Ela baixa os dados do Minio e os prepara para a fase de treinamento:

def get_data():
    import pandas as pd
    from io import BytesIO
    from minio import Minio
    import nltk
    from nltk.corpus import stopwords

    MINIO_HOST="minio-service.kubeflow:9000"
    MINIO_ACCESS_KEY="minio"
    MINIO_SECRET_KEY="minio123"
    MINIO_MODEL_BUCKET="kserve"
    MINIO_DATA_PRODUCT_PATH="data/produtos.csv"
    MINIO_DATA_PREP_PATH="data/prep.csv"
    INCOME_MODEL_PATH="sklearn/income/model"

    minioClient = Minio(MINIO_HOST,
                    access_key=MINIO_ACCESS_KEY,
                    secret_key=MINIO_SECRET_KEY,
                    secure=False)
    obj = minioClient.get_object(
        MINIO_MODEL_BUCKET,
        MINIO_DATA_PRODUCT_PATH,
    )
    nltk.download('stopwords')
    products_data = pd.read_csv(obj,delimiter=';', encoding='utf-8')
    products_data.count()
    # concatenando as colunas nome e descricao
    products_data['informacao'] = products_data['nome'] + products_data['descricao']
    # excluindo linhas com valor de informacao ou categoria NaN
    products_data.dropna(subset=['informacao', 'categoria'], inplace=True)
    products_data.drop(columns=['nome', 'descricao'], inplace=True)

    stop_words=set(stopwords.words("portuguese"))
    # transforma a string em caixa baixa e remove stopwords
    products_data['sem_stopwords'] = products_data['informacao'].str.lower().apply(lambda x: ' '.join([word for word in x.split() if word not in (stop_words)]))
    tokenizer = nltk.RegexpTokenizer(r"\w+")
    products_data['tokens'] = products_data['sem_stopwords'].apply(tokenizer.tokenize) # aplica o regex tokenizer
    products_data.drop(columns=['sem_stopwords','informacao'],inplace=True) # Exclui as colunas antigas

    products_data["strings"]= products_data["tokens"].str.join(" ") # reunindo cada elemento da lista
    products_data.head()

    csv = products_data.to_csv(sep=';').encode('utf-8')
    minioClient.put_object(
        MINIO_MODEL_BUCKET,
        MINIO_DATA_PREP_PATH,
        data=BytesIO(csv),
        length=len(csv),
        content_type='application/csv'
    )

Depois, vamos criar uma função para o treinamento do modelo:

def training():
    import pandas as pd
    import minio
    import nltk
    from nltk.corpus import stopwords
    from sklearn.feature_extraction.text import CountVectorizer
    from sklearn.naive_bayes import MultinomialNB
    from sklearn.model_selection import train_test_split
    from sklearn.pipeline import Pipeline
    from minio import Minio
    import joblib

    MINIO_HOST="minio-service.kubeflow:9000"
    MINIO_ACCESS_KEY="minio"
    MINIO_SECRET_KEY="minio123"
    MINIO_MODEL_BUCKET="kserve"
    MINIO_DATA_PRODUCT_PATH="data/produtos.csv"
    MINIO_DATA_PREP_PATH="data/prep.csv"
    INCOME_MODEL_PATH="sklearn/income/model"
    nltk.download('stopwords')
    minioClient = Minio(MINIO_HOST,
                    access_key=MINIO_ACCESS_KEY,
                    secret_key=MINIO_SECRET_KEY,
                    secure=False)
    obj = minioClient.get_object(
        MINIO_MODEL_BUCKET,
        MINIO_DATA_PREP_PATH,
    )
    products_data = pd.read_csv(obj,delimiter=';', encoding='utf-8')
    X_train,X_test,y_train,y_test = train_test_split( # Separação dos dados para teste e treino
        products_data["strings"],
        products_data["categoria"],
        test_size = 0.2,
        random_state = 10
    )
    pipe = Pipeline([('vetorizador', CountVectorizer()), ("classificador", MultinomialNB())]) # novo
    pipe.fit(X_train, y_train)
    joblib.dump(pipe, "model.joblib")
    print(minioClient.fput_object(MINIO_MODEL_BUCKET, f"{INCOME_MODEL_PATH}/model.joblib", 'model.joblib'))

E, por fim, efetuar o deploy do modelo:

def deploy():
    from kubernetes import client
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1SKLearnSpec

    namespace = utils.get_default_target_namespace()
    MINIO_HOST="minio-service.kubeflow:9000"
    MINIO_ACCESS_KEY="minio"
    MINIO_SECRET_KEY="minio123"
    MINIO_MODEL_BUCKET="kserve"
    MINIO_DATA_PRODUCT_PATH="data/produtos.csv"
    MINIO_DATA_PREP_PATH="data/produtos.csv"
    INCOME_MODEL_PATH="sklearn/income/model"
    DEPLOY_NAMESPACE="kubeflow-user-example-com"

    name='sklearn-produtos'
    kserve_version='v1beta1'
    api_version = constants.KSERVE_GROUP + '/' + kserve_version

    predictor = V1beta1PredictorSpec(
        service_account_name='kserve-sa',
        min_replicas=1,
        sklearn=V1beta1SKLearnSpec(
            storage_uri='s3://'+MINIO_MODEL_BUCKET+'/'+ INCOME_MODEL_PATH,
            resources=client.V1ResourceRequirements(
                requests={"cpu": "300m", "memory": "128Mi"},
                limits={"cpu": "500m", "memory": "512Mi"},
            ),
        ),
    )
    isvc = V1beta1InferenceService(api_version=api_version,
                                kind="KSERVE_KIND",
                                metadata=client.V1ObjectMeta(
                                    name=name,
                                    namespace=DEPLOY_NAMESPACE
                                ),
                                spec=V1beta1InferenceServiceSpec(
                                predictor=predictor
                                )
                                )
    KServe = KServeClient()
    KServe.replace(name, isvc)
    #KServe.create(isvc)

Agora, precisamos transformar as funções em componentes para usarmos dentro do pipeline do Kubeflow:

import kfp
import kfp.dsl as dsl

get_data_step_comp = kfp.dsl.component(
func=get_data,
    base_image='python:3.11',
    packages_to_install=['minio', 'nltk==3.7', 'scikit-learn==1.1.3','pandas==2.1.4', 'joblib==1.1.0'])

create_step_training_comp = kfp.dsl.component(
    func=training,
    base_image='python:3.11',
    packages_to_install=['minio', 'nltk==3.7', 'scikit-learn==1.1.3', 'pandas==2.1.4', 'joblib==1.1.0'])

create_step_deploy_comp = kfp.dsl.component(
    func=deploy,
    base_image='python:3.11',
    packages_to_install=['kserve==0.15.1'])

@dsl.pipeline(
   name='Pipeline de produtos',
   description='Data prep, training and deploy'
)
def pipeline_produtos():
    task_data = get_data_step_comp()
    task_data.set_caching_options(False)
    task_training = create_step_training_comp().after(task_data)
    task_training.set_caching_options(False)
    task_deploy = create_step_deploy_comp().after(task_training)
    task_deploy.set_caching_options(False)

Veja que cada componente tem as próprias dependências de pacotes. O pipeline orquestra a chamada dos contêineres de forma sequencial. Execute o seguinte comando para criar o pipeline e acompanhe na WebUI o resultado:

compiler.Compiler().compile(pipeline_produtos, 'pipeline_produtos.yaml')

client = Client()
run = client.create_run_from_pipeline_package(
    'pipeline_produtos.yaml',
)

Testando tudo junto: fase 2

Agora que já vimos como o pipeline pode ajudar no processo de deploy de Machine Learning, vamos utilizar alguns componentes do Kubeflow Pipeline para facilitar o processo.

Vamos substituir a etapa de deploy do modelo pelo componente KServe.

from kfp import components
kserve_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/2.1.0/components/kserve/component.yaml')

@dsl.pipeline(
   name='Pipeline de produtos2',
   description='Data prep, training and deploy'
)
def pipeline_produtos2():
    task_data = get_data_step_comp()
    task_training = create_step_training_comp().after(task_data)
    MINIO_MODEL_BUCKET="kserve"
    INCOME_MODEL_PATH="sklearn/income/model"
    DEPLOY_NAMESPACE=namespace
    MODEL_URI='s3://'+MINIO_MODEL_BUCKET+'/'+ INCOME_MODEL_PATH
    NAME='sklearn-produtos'
    SERVICE_ACCOUT='kserve-sa'
    task_deploy = kserve_op(
        action='apply',
        model_name=NAME,
        namespace=DEPLOY_NAMESPACE,
        model_uri=MODEL_URI,
        framework='sklearn',
        service_account=SERVICE_ACCOUT,
        resource_requests='{"cpu": "300m", "memory": "128Mi"}',
        resource_limits='{"cpu": "500m", "memory": "512Mi"}',
    ).after(task_training)

Execute o seguinte comando para criar o pipeline e acompanhe na WebUI o resultado:

compiler.Compiler().compile(pipeline_produtos2, 'pipeline_produtos2.yaml')

client = Client()
run = client.create_run_from_pipeline_package(
    'pipeline_produtos2.yaml',
)

Veja e explore a documentação do componente KServe. Existem parâmetros para controlar a ação de deploy e opções de rollout de deploy.

Explore outros componentes do Kubeflow Pipeline, como o Metrics.

Last updated