11.4 Kserve

O KServe habilita inferência de modelos Machine Learning no Kubernetes e é totalmente integrado ao Kubeflow. A seguir vamos portar o modelo de classificação de categorias visto nos capítulos anteriores para o Kserve. Antes de começar os trabalhos com o modelo precisamos configurar as credenciais para que o Kserve consiga obter os artefatos no minio. Para usar o minio vamos criar uma secret com as credentiais necessárias e um service account para utilizarmos na subida do servico:

apiVersion: v1
kind: Secret
metadata:
  name: s3creds
  namespace: kubeflow-user-example-com
  annotations:
    serving.kserve.io/s3-endpoint: "minio-service.kubeflow:9000"
    serving.kserve.io/s3-usehttps: "0"
    serving.kserve.io/s3-region: "us-east-1"
    serving.kserve.io/s3-useanoncredential: "false"
type: Opaque
stringData:
  AWS_ACCESS_KEY_ID: "minio"
  AWS_SECRET_ACCESS_KEY: "minio123"
  S3_USE_HTTPS: "0"
  S3_VERIFY_SSL: "0"
  S3_ENDPOINT: "minio-service.kubeflow:9000"
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: kserve-sa 
  namespace: kubeflow-user-example-com
secrets:
  - name: s3creds

Crie um arquivo yaml como o exemplo e crie os recursos com o kubectl.

Agora vamos voltar ao Notebook para treinar e efetuar o deploy de modelo. O comando a seguir prepara o cliente do Kserve e do minio.

!pip install --upgrade pip
!pip install kserve pydantic minio kfp -U
from kubernetes import client 
from kserve import KServeClient
from kserve import constants
from kserve import utils
from kserve import V1beta1InferenceService
from kserve import V1beta1InferenceServiceSpec
from kserve import V1beta1PredictorSpec
from kserve import V1beta1SKLearnSpec
from minio import Minio
from minio.error import ResponseError

Agora vamos inicializar algumas variáveis de crendênciais do minio, paths para os modelos e namespace padrão:

namespace = utils.get_default_target_namespace()
MINIO_HOST="minio-service.kubeflow:9000"
MINIO_ACCESS_KEY="minio"
MINIO_SECRET_KEY="minio123"
MINIO_MODEL_BUCKET="kserve"
MINIO_DATA_PRODUCT_PATH="data/produtos.csv"
INCOME_MODEL_PATH="sklearn/income/model"
DEPLOY_NAMESPACE=namespace

Para iniciar o cliente do minio precismos inicia-lo com as credênciais:

minioClient = Minio(MINIO_HOST,
  access_key=MINIO_ACCESS_KEY,
  secret_key=MINIO_SECRET_KEY,
  secure=False)

Para armazenar o modelo, vamos criar um bucket no minio:

if not minioClient.bucket_exists(MINIO_MODEL_BUCKET):
    minioClient.make_bucket(MINIO_MODEL_BUCKET)

Vamos efetuar o upload do arquivo de dados no minio para ser a entrada de dados para o trainamento do modelo, antes de executar o comando abaixo, faça o upload do arquivo para o seu notebook:

minioClient.fput_object(MINIO_MODEL_BUCKET,MINIO_DATA_PRODUCT_PATH,"produtos.csv")

Agora vamos instalar as dependência para o treinamento do modelo:

!pip install --upgrade  nltk==3.7 scikit-learn==1.1.3 joblib==1.1.0 pandas==2.1.4 executing==1.1.1 stack-data==0.5.1 ipython==8.22.2

A primeira etapa é a preparação de dados:

import pandas as pd
import nltk
from nltk.corpus import stopwords

nltk.download('stopwords')

obj = minioClient.get_object(
    MINIO_MODEL_BUCKET,
    MINIO_DATA_PRODUCT_PATH,
)
products_data = pd.read_csv(obj,delimiter=';', encoding='utf-8')
products_data.count()
products_data["informacao"] = products_data["nome"]+ products_data["descricao"]
# concatenando as colunas nome e descricao
products_data['informacao'] = products_data['nome'] + products_data['descricao']
# excluindo linhas com valor de informacao ou categoria NaN
products_data.dropna(subset=['informacao', 'categoria'], inplace=True)
products_data.drop(columns=['nome', 'descricao'], inplace=True)

stop_words=set(stopwords.words("portuguese"))
# transforma a string em caixa baixa e remove stopwords
products_data['sem_stopwords'] = products_data['informacao'].str.lower().apply(lambda x: ' '.join([word for word in x.split() if word not in (stop_words)]))
tokenizer = nltk.RegexpTokenizer(r"\w+")
products_data['tokens'] = products_data['sem_stopwords'].apply(tokenizer.tokenize) # aplica o regex tokenizer
products_data.drop(columns=['sem_stopwords','informacao'],inplace=True) # Exclui as colunas antigas

products_data["strings"]= products_data["tokens"].str.join(" ") # reunindo cada elemento da lista
products_data.head()

A segunda etapa é o treinamento do modelo:

from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline

X_train,X_test,y_train,y_test = train_test_split( # Separação dos dados para teste e treino
    products_data["strings"], 
    products_data["categoria"], 
    test_size = 0.2, 
    random_state = 10
)
pipe = Pipeline([('vetorizador', CountVectorizer()), ("classificador", MultinomialNB())]) # novo

Salvar o modelo no minio:

import joblib
pipe.fit(X_train, y_train)
joblib.dump(pipe, "model.joblib")
print(minioClient.fput_object(MINIO_MODEL_BUCKET, f"{INCOME_MODEL_PATH}/model.joblib", 'model.joblib'))
from sklearn.metrics import accuracy_score
from sklearn import metrics

y_prediction = pipe.predict(X_test)
accuracy = accuracy_score(y_prediction, y_test)

Agora vamos usar o cliente do Kserve para efetuar o deploy do modelo:

name='sklearn-produtos'
kserve_version='v1beta1'
api_version = constants.KSERVE_GROUP + '/' + kserve_version

predictor = V1beta1PredictorSpec(
    service_account_name='kserve-sa',
    min_replicas=1,
    sklearn=V1beta1SKLearnSpec(
        storage_uri='s3://'+MINIO_MODEL_BUCKET+'/'+ INCOME_MODEL_PATH + "/model.joblib"  ,
        resources=client.V1ResourceRequirements(
            requests={"cpu": "300m", "memory": "128Mi"},
            limits={"cpu": "500m", "memory": "512Mi"},
        ),
    ),
)
isvc = V1beta1InferenceService(
  api_version=api_version,
  kind=constants.KSERVE_KIND,
  metadata=client.V1ObjectMeta
  (
    name=name, 
    namespace=DEPLOY_NAMESPACE
  ),
  spec=V1beta1InferenceServiceSpec(
    predictor=predictor
  )
)
KServe = KServeClient()
KServe.create(isvc)

Após alguns minutos é possível consultar o servidor de inferência na interface do Kubeflow.

KServe.get(name, namespace=namespace, watch=True, timeout_seconds=120)

É possível testar a chamada do endpoint de inferência via notebook: Mas antes de testar o endpoint é necessário configurar o RBAC para acesso. Para fins de teste local iremos criar um RBAC super permissível, mas para ambientes produtivos precisamos refinar o RBAC:

apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy
metadata:
  name: allow-all
  namespace: kubeflow-user-example-com
spec:
 rules:
 - {}

Crie o yaml do RBAC e apliquei com o kubectl. Agora é só testar o request para o endpoint:

import requests

isvc_resp = KServe.get(name, namespace=namespace)
isvc_url = f"{isvc_resp['status']['address']['url']}/v1/models/sklearn-produtos:predict"

print(isvc_resp)

inference_input = {"instances": ["Figura Transformers Prime War Deluxe - E9687 - Hasbro", "Senhor dos aneis", "Senhor dos anéis"]}

response = requests.post(isvc_url, json=inference_input)
print(response.text)

Nesta sessão conseguimos treinar e efetuar o deploy do modelo via notebook jupyter de uma forma fácil usando componentes somente do Kubeflow.

Last updated