Prática DevOps com Docker para Machine Learning
  • Prática de DevOps com Docker para Machine Learning
  • Autores e Agradecimentos
  • Uso do Livro
  • Contribua com o Livro
  • Licença
  • Organização do Livro
  • 1. Introdução
    • 1.1 Máquinas Virtuais e Contêineres
    • 1.2 DevOps e Docker
    • 1.3 Configuração do Ambiente - Python
    • 1.4 Configuração do Ambiente - Docker
    • 1.5 Dockerfile, Imagem e Contêiner Docker
    • 1.6 Docker Hub e Comandos Adicionais
  • 2. Desenvolvimento
    • 2.1 Do notebook para aplicação - parte 1
    • 2.2 Do notebook para aplicação - parte 2
    • 2.3 Do notebook para aplicação - parte 3
  • 3. Produção
    • 3.1 Desenvolvimento vs Produção: o fim ou o início?
    • 3.2 Ambiente de Produção - parte 1
    • 3.3 Ambiente de Produção - parte 2
    • 3.4 Ambiente de Produção - parte 3
  • 4. Monitoramento
    • 4.1 Introdução
    • 4.2 Configurando o Servidor de Monitoramento
    • 4.3 Monitorando Servidores do Ambiente de Produção
    • 4.4 Comandos de Verificação do Nagios
    • 4.5 Criando Verificações Mais Específicas
    • 4.6 Criando Alertas
    • 4.7 Recuperando de Problemas
    • 4.8 Verificação de Contêineres via NRPE
  • 5. Infraestrutura como Código e Orquestração
    • 5.1 Introdução
    • 5.2 Orquestração com Docker Compose
    • 5.3 Orquestração com Kubernetes
  • 6. Integração Contínua
    • 6.1 Introdução
    • 6.2 Controle de Versão
    • 6.3 Configurando um repositório no GitLab
    • 6.4 Branch e merge
    • 6.5 Pipeline de Integração Contínua com GitLab CI/CD
  • 7. Entrega Contínua
    • 7.1 Introdução
    • 7.2 Implantação automática no Docker Hub
    • 7.3 Implantação automática no Heroku
    • 7.4 Implantação automática no Google Kubernetes Engine (GKE)
    • 7.5 Testando tudo junto
  • 8. Model serving
    • 8.1 Introdução
    • 8.2 Model serving com mlserver
    • 8.3 CI/CD com GitLab e mlserver
    • 8.4 Testando tudo junto
  • 9. Model serving batch
    • 9.1 Introdução
    • 9.2 Spark
    • 9.3 Airflow
    • 9.4 Testando tudo junto
  • 10. MLOps com mlflow
    • 10.1 Introdução
    • 10.2 Visão geral do MLflow
    • 10.3 Configuração do MLflow
    • 10.4 Testando MLflow
  • 11. MLOps com Kubeflow
    • 11.1 Visão geral do Kubeflow
    • 11.2 Configuracão
    • 11.3 Kubeflow Pipeline
    • 11.4 Kserve
    • 11.5 Testando tudo junto
  • 12. Conclusão
    • Conclusão
Powered by GitBook
On this page
  • 11.4.1 Testando tudo junto fase 1
  • Testando tudo junto fase 2
  1. 11. MLOps com Kubeflow

11.5 Testando tudo junto

Previous11.4 KserveNextConclusão

Last updated 10 months ago

Agora que já sabemos como treinar e efetuar o deploy do modelo via Notebook, vamos criar um pipeline para automatizar o processo.

11.4.1 Testando tudo junto fase 1

O Primeiro passo é separar de forma lógia os steps.

  • Preparação dos dados

  • Treinamento do modelo

  • Deploy

Assim como fizemos no capítulo vamos usar o notebook parar criar o pipeline para preparar os dados, treinar o modelo e efetuar o deploy.

Neste primeiro pipeline vamos criar algo simples com basicamente o código que vimos no capítulo . Essa primeira função é a fase de preparação de dados. Ela baixa os dados do minio e prepara os dados para a fase de treinamento:

def get_data():
    import pandas as pd
    from io import BytesIO
    from minio import Minio
    import nltk
    from nltk.corpus import stopwords

    MINIO_HOST="minio-service.kubeflow:9000"
    MINIO_ACCESS_KEY="minio"
    MINIO_SECRET_KEY="minio123"
    MINIO_MODEL_BUCKET="kserve"
    MINIO_DATA_PRODUCT_PATH="data/produtos.csv"
    MINIO_DATA_PREP_PATH="data/prep.csv"
    INCOME_MODEL_PATH="sklearn/income/model"

    minioClient = Minio(MINIO_HOST,
                    access_key=MINIO_ACCESS_KEY,
                    secret_key=MINIO_SECRET_KEY,
                    secure=False)
    obj = minioClient.get_object(
        MINIO_MODEL_BUCKET,
        MINIO_DATA_PRODUCT_PATH,
    )
    nltk.download('stopwords')
    products_data = pd.read_csv(obj,delimiter=';', encoding='utf-8')
    products_data.count()
    # concatenando as colunas nome e descricao
    products_data['informacao'] = products_data['nome'] + products_data['descricao']
    # excluindo linhas com valor de informacao ou categoria NaN
    products_data.dropna(subset=['informacao', 'categoria'], inplace=True)
    products_data.drop(columns=['nome', 'descricao'], inplace=True)

    stop_words=set(stopwords.words("portuguese"))
    # transforma a string em caixa baixa e remove stopwords
    products_data['sem_stopwords'] = products_data['informacao'].str.lower().apply(lambda x: ' '.join([word for word in x.split() if word not in (stop_words)]))
    tokenizer = nltk.RegexpTokenizer(r"\w+")
    products_data['tokens'] = products_data['sem_stopwords'].apply(tokenizer.tokenize) # aplica o regex tokenizer
    products_data.drop(columns=['sem_stopwords','informacao'],inplace=True) # Exclui as colunas antigas

    products_data["strings"]= products_data["tokens"].str.join(" ") # reunindo cada elemento da lista
    products_data.head()

    csv = products_data.to_csv(sep=';').encode('utf-8')
    minioClient.put_object(
        MINIO_MODEL_BUCKET,
        MINIO_DATA_PREP_PATH,
        data=BytesIO(csv),
        length=len(csv),
        content_type='application/csv'
    )

Depois vamos criar uma função para o treinamento de modelo:

def training():
    import pandas as pd
    import minio
    import nltk
    from nltk.corpus import stopwords    
    from sklearn.feature_extraction.text import CountVectorizer
    from sklearn.naive_bayes import MultinomialNB
    from sklearn.model_selection import train_test_split
    from sklearn.pipeline import Pipeline
    from minio import Minio
    import joblib

    MINIO_HOST="minio-service.kubeflow:9000"
    MINIO_ACCESS_KEY="minio"
    MINIO_SECRET_KEY="minio123"
    MINIO_MODEL_BUCKET="kserve"
    MINIO_DATA_PRODUCT_PATH="data/produtos.csv"
    MINIO_DATA_PREP_PATH="data/prep.csv"
    INCOME_MODEL_PATH="sklearn/income/model"
    nltk.download('stopwords')
    minioClient = Minio(MINIO_HOST,
                    access_key=MINIO_ACCESS_KEY,
                    secret_key=MINIO_SECRET_KEY,
                    secure=False)
    obj = minioClient.get_object(
        MINIO_MODEL_BUCKET,
        MINIO_DATA_PREP_PATH,
    )
    products_data = pd.read_csv(obj,delimiter=';', encoding='utf-8')
    X_train,X_test,y_train,y_test = train_test_split( # Separação dos dados para teste e treino
        products_data["strings"], 
        products_data["categoria"], 
        test_size = 0.2, 
        random_state = 10
    )
    pipe = Pipeline([('vetorizador', CountVectorizer()), ("classificador", MultinomialNB())]) # novo
    pipe.fit(X_train, y_train)
    joblib.dump(pipe, "model.joblib")
    print(minioClient.fput_object(MINIO_MODEL_BUCKET, f"{INCOME_MODEL_PATH}/model.joblib", 'model.joblib'))

E por fim efetuar o depoy do modelo:

def deploy():
    from kubernetes import client 
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1SKLearnSpec

    namespace = utils.get_default_target_namespace()
    MINIO_HOST="minio-service.kubeflow:9000"
    MINIO_ACCESS_KEY="minio"
    MINIO_SECRET_KEY="minio123"
    MINIO_MODEL_BUCKET="kserve"
    MINIO_DATA_PRODUCT_PATH="data/produtos.csv"
    MINIO_DATA_PREP_PATH="data/produtos.csv"
    INCOME_MODEL_PATH="sklearn/income/model"
    DEPLOY_NAMESPACE="kubeflow-user-example-com"

    name='sklearn-produtos'
    kserve_version='v1beta1'
    api_version = constants.KSERVE_GROUP + '/' + kserve_version

    predictor = V1beta1PredictorSpec(
        service_account_name='kserve-sa',
        min_replicas=1,
        sklearn=V1beta1SKLearnSpec(
            storage_uri='s3://'+MINIO_MODEL_BUCKET+'/'+ INCOME_MODEL_PATH,
            resources=client.V1ResourceRequirements(
                requests={"cpu": "300m", "memory": "128Mi"},
                limits={"cpu": "500m", "memory": "512Mi"},
            ),
        ),
    )
    isvc = V1beta1InferenceService(api_version=api_version,
                                kind=constants.KSERVE_KIND,
                                metadata=client.V1ObjectMeta(
                                    name=name, 
                                    namespace=DEPLOY_NAMESPACE
                                ),
                                spec=V1beta1InferenceServiceSpec(
                                predictor=predictor
                                )
                                )
    KServe = KServeClient()
    KServe.replace(name, isvc) 
    #KServe.create(isvc) 
import kfp
import kfp.dsl as dsl

get_data_step_comp = kfp.dsl.component(
func=get_data,
    base_image='python:3.11',
    packages_to_install=['minio', 'nltk==3.7', 'scikit-learn==1.1.3', 'joblib==1.1.0', 'pandas==2.1.4'])

create_step_training_comp = kfp.dsl.component(
    func=training,
    base_image='python:3.11',
    packages_to_install=['minio', 'nltk==3.7', 'scikit-learn==1.1.3', 'joblib==1.1.0', 'pandas==2.1.4'])

create_step_deploy_comp = kfp.dsl.component(
    func=deploy,
    base_image='python:3.11',
    packages_to_install=['kserve==0.13.0'])

@dsl.pipeline(
   name='Pipeline de produtos',
   description='Data prep, training and deploy'
)
def pipeline_produtos():
    task_data = get_data_step_comp()
    task_training = create_step_training_comp().after(task_data)
    task_deploy = create_step_deploy_comp().after(task_training)

Veja que cada componente tem as próprias dependências de pacotes. O pipeline orquestra a chamada dos containiers de forma sequencial. Execute o seguinte comando para criar o pipeline e acompanhe na WebUI o resultado:

client = kfp.Client()
client.create_run_from_pipeline_func(pipeline_produtos, namespace=namespace, arguments={}, experiment_name="pipeline_produtos")

Testando tudo junto fase 2

Agora que já vimos como o pipeline pode ajudar no processo de deploy de Machine Learning, vamos utilizar alguns componentes do Kubeflow pipeline para facilitar o processo.

apiVersion: v1
kind: LimitRange
metadata:
  name: mem-cpu-limit-range
spec:
  limits:
  - default:
      memory: 256Mi
      cpu: 500m
    defaultRequest:
      memory: 64Mi
      cpu: 100m
    type: Container
from kfp import components
kserve_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/2.0.5/components/kserve/component.yaml')

@dsl.pipeline(
   name='Pipeline de produtos',
   description='Data prep, training and deploy'
)
def pipeline_produtos():
    task_data = get_data_step_comp()
    task_training = create_step_training_comp().after(task_data)
    MINIO_MODEL_BUCKET="kserve"
    INCOME_MODEL_PATH="sklearn/income/model"
    DEPLOY_NAMESPACE=namespace
    MODEL_URI='s3://'+MINIO_MODEL_BUCKET+'/'+ INCOME_MODEL_PATH
    NAME='sklearn-produtos'
    SERVICE_ACCOUT='kserve-sa'
    task_deploy = kserve_op(
        action='apply',
        model_name=NAME,
        namespace=DEPLOY_NAMESPACE,
        model_uri=MODEL_URI,
        framework='sklearn',
        service_account=SERVICE_ACCOUT
    ).after(task_training)

Execute o seguinte comando para criar o pipeline e acompanhe na WebUI o resultado:

client = kfp.Client()
client.create_run_from_pipeline_func(pipeline_produtos, namespace=namespace, arguments={}, experiment_name="pipeline_produtos2")

Explore outros componentes do Kubeflow pipeline como o (Metrics)[https://www.kubeflow.org/docs/components/pipelines/sdk/pipelines-metrics/].

Agora precisamos transformar as funções em componentes para usarmos dentro do pipeline do Kubeflow. Para isso vamos utilizar o componente :

Vamos substituir a etapa de deploy de modelo pelo componente .

Veja e explore a documentação do componente . Existem parâmetros para controlar a ação de deploy e opções de rollout de deploy.

10.3
10.4
create_component_from_func
Keserve
Keserve