11.5 Testando tudo junto
Agora que já sabemos como treinar e efetuar o deploy do modelo via Notebook, vamos criar um pipeline para automatizar o processo.
11.5.1 Testando tudo junto: fase 1
O primeiro passo é separar de forma lógica os steps (passos):
Preparação dos dados
Treinamento do modelo
Deploy
Assim como fizemos no capítulo 11.3, vamos usar o Notebook para criar o pipeline que irá preparar os dados, treinar o modelo e efetuar o deploy.
Neste primeiro pipeline, vamos criar algo simples, basicamente com o código que vimos no capítulo 11.4. Essa primeira função é a fase de preparação de dados. Ela baixa os dados do Minio e os prepara para a fase de treinamento:
def get_data():
import pandas as pd
from io import BytesIO
from minio import Minio
import nltk
from nltk.corpus import stopwords
MINIO_HOST="minio-service.kubeflow:9000"
MINIO_ACCESS_KEY="minio"
MINIO_SECRET_KEY="minio123"
MINIO_MODEL_BUCKET="kserve"
MINIO_DATA_PRODUCT_PATH="data/produtos.csv"
MINIO_DATA_PREP_PATH="data/prep.csv"
INCOME_MODEL_PATH="sklearn/income/model"
minioClient = Minio(MINIO_HOST,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=False)
obj = minioClient.get_object(
MINIO_MODEL_BUCKET,
MINIO_DATA_PRODUCT_PATH,
)
nltk.download('stopwords')
products_data = pd.read_csv(obj,delimiter=';', encoding='utf-8')
products_data.count()
# concatenando as colunas nome e descricao
products_data['informacao'] = products_data['nome'] + products_data['descricao']
# excluindo linhas com valor de informacao ou categoria NaN
products_data.dropna(subset=['informacao', 'categoria'], inplace=True)
products_data.drop(columns=['nome', 'descricao'], inplace=True)
stop_words=set(stopwords.words("portuguese"))
# transforma a string em caixa baixa e remove stopwords
products_data['sem_stopwords'] = products_data['informacao'].str.lower().apply(lambda x: ' '.join([word for word in x.split() if word not in (stop_words)]))
tokenizer = nltk.RegexpTokenizer(r"\w+")
products_data['tokens'] = products_data['sem_stopwords'].apply(tokenizer.tokenize) # aplica o regex tokenizer
products_data.drop(columns=['sem_stopwords','informacao'],inplace=True) # Exclui as colunas antigas
products_data["strings"]= products_data["tokens"].str.join(" ") # reunindo cada elemento da lista
products_data.head()
csv = products_data.to_csv(sep=';').encode('utf-8')
minioClient.put_object(
MINIO_MODEL_BUCKET,
MINIO_DATA_PREP_PATH,
data=BytesIO(csv),
length=len(csv),
content_type='application/csv'
)
Depois, vamos criar uma função para o treinamento do modelo:
def training():
import pandas as pd
import minio
import nltk
from nltk.corpus import stopwords
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from minio import Minio
import joblib
MINIO_HOST="minio-service.kubeflow:9000"
MINIO_ACCESS_KEY="minio"
MINIO_SECRET_KEY="minio123"
MINIO_MODEL_BUCKET="kserve"
MINIO_DATA_PRODUCT_PATH="data/produtos.csv"
MINIO_DATA_PREP_PATH="data/prep.csv"
INCOME_MODEL_PATH="sklearn/income/model"
nltk.download('stopwords')
minioClient = Minio(MINIO_HOST,
access_key=MINIO_ACCESS_KEY,
secret_key=MINIO_SECRET_KEY,
secure=False)
obj = minioClient.get_object(
MINIO_MODEL_BUCKET,
MINIO_DATA_PREP_PATH,
)
products_data = pd.read_csv(obj,delimiter=';', encoding='utf-8')
X_train,X_test,y_train,y_test = train_test_split( # Separação dos dados para teste e treino
products_data["strings"],
products_data["categoria"],
test_size = 0.2,
random_state = 10
)
pipe = Pipeline([('vetorizador', CountVectorizer()), ("classificador", MultinomialNB())]) # novo
pipe.fit(X_train, y_train)
joblib.dump(pipe, "model.joblib")
print(minioClient.fput_object(MINIO_MODEL_BUCKET, f"{INCOME_MODEL_PATH}/model.joblib", 'model.joblib'))
E, por fim, efetuar o deploy do modelo:
def deploy():
from kubernetes import client
from kserve import KServeClient
from kserve import constants
from kserve import utils
from kserve import V1beta1InferenceService
from kserve import V1beta1InferenceServiceSpec
from kserve import V1beta1PredictorSpec
from kserve import V1beta1SKLearnSpec
namespace = utils.get_default_target_namespace()
MINIO_HOST="minio-service.kubeflow:9000"
MINIO_ACCESS_KEY="minio"
MINIO_SECRET_KEY="minio123"
MINIO_MODEL_BUCKET="kserve"
MINIO_DATA_PRODUCT_PATH="data/produtos.csv"
MINIO_DATA_PREP_PATH="data/produtos.csv"
INCOME_MODEL_PATH="sklearn/income/model"
DEPLOY_NAMESPACE="kubeflow-user-example-com"
name='sklearn-produtos'
kserve_version='v1beta1'
api_version = constants.KSERVE_GROUP + '/' + kserve_version
predictor = V1beta1PredictorSpec(
service_account_name='kserve-sa',
min_replicas=1,
sklearn=V1beta1SKLearnSpec(
storage_uri='s3://'+MINIO_MODEL_BUCKET+'/'+ INCOME_MODEL_PATH,
resources=client.V1ResourceRequirements(
requests={"cpu": "300m", "memory": "128Mi"},
limits={"cpu": "500m", "memory": "512Mi"},
),
),
)
isvc = V1beta1InferenceService(api_version=api_version,
kind="KSERVE_KIND",
metadata=client.V1ObjectMeta(
name=name,
namespace=DEPLOY_NAMESPACE
),
spec=V1beta1InferenceServiceSpec(
predictor=predictor
)
)
KServe = KServeClient()
KServe.replace(name, isvc)
#KServe.create(isvc)
Agora, precisamos transformar as funções em componentes para usarmos dentro do pipeline do Kubeflow:
import kfp
import kfp.dsl as dsl
get_data_step_comp = kfp.dsl.component(
func=get_data,
base_image='python:3.11',
packages_to_install=['minio', 'nltk==3.7', 'scikit-learn==1.1.3','pandas==2.1.4', 'joblib==1.1.0'])
create_step_training_comp = kfp.dsl.component(
func=training,
base_image='python:3.11',
packages_to_install=['minio', 'nltk==3.7', 'scikit-learn==1.1.3', 'pandas==2.1.4', 'joblib==1.1.0'])
create_step_deploy_comp = kfp.dsl.component(
func=deploy,
base_image='python:3.11',
packages_to_install=['kserve==0.15.1'])
@dsl.pipeline(
name='Pipeline de produtos',
description='Data prep, training and deploy'
)
def pipeline_produtos():
task_data = get_data_step_comp()
task_data.set_caching_options(False)
task_training = create_step_training_comp().after(task_data)
task_training.set_caching_options(False)
task_deploy = create_step_deploy_comp().after(task_training)
task_deploy.set_caching_options(False)
Veja que cada componente tem as próprias dependências de pacotes. O pipeline orquestra a chamada dos contêineres de forma sequencial. Execute o seguinte comando para criar o pipeline e acompanhe na WebUI o resultado:
compiler.Compiler().compile(pipeline_produtos, 'pipeline_produtos.yaml')
client = Client()
run = client.create_run_from_pipeline_package(
'pipeline_produtos.yaml',
)
Testando tudo junto: fase 2
Agora que já vimos como o pipeline pode ajudar no processo de deploy de Machine Learning, vamos utilizar alguns componentes do Kubeflow Pipeline para facilitar o processo.
Vamos substituir a etapa de deploy do modelo pelo componente KServe.
from kfp import components
kserve_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/2.1.0/components/kserve/component.yaml')
@dsl.pipeline(
name='Pipeline de produtos2',
description='Data prep, training and deploy'
)
def pipeline_produtos2():
task_data = get_data_step_comp()
task_training = create_step_training_comp().after(task_data)
MINIO_MODEL_BUCKET="kserve"
INCOME_MODEL_PATH="sklearn/income/model"
DEPLOY_NAMESPACE=namespace
MODEL_URI='s3://'+MINIO_MODEL_BUCKET+'/'+ INCOME_MODEL_PATH
NAME='sklearn-produtos'
SERVICE_ACCOUT='kserve-sa'
task_deploy = kserve_op(
action='apply',
model_name=NAME,
namespace=DEPLOY_NAMESPACE,
model_uri=MODEL_URI,
framework='sklearn',
service_account=SERVICE_ACCOUT,
resource_requests='{"cpu": "300m", "memory": "128Mi"}',
resource_limits='{"cpu": "500m", "memory": "512Mi"}',
).after(task_training)
Execute o seguinte comando para criar o pipeline e acompanhe na WebUI o resultado:
compiler.Compiler().compile(pipeline_produtos2, 'pipeline_produtos2.yaml')
client = Client()
run = client.create_run_from_pipeline_package(
'pipeline_produtos2.yaml',
)
Veja e explore a documentação do componente KServe. Existem parâmetros para controlar a ação de deploy e opções de rollout de deploy.
Explore outros componentes do Kubeflow Pipeline, como o Metrics.
Last updated