Prática DevOps com Docker para Machine Learning
  • Prática de DevOps com Docker para Machine Learning
  • Autores e Agradecimentos
  • Uso do Livro
  • Contribua com o Livro
  • Licença
  • Organização do Livro
  • 1. Introdução
    • 1.1 Máquinas Virtuais e Contêineres
    • 1.2 DevOps e Docker
    • 1.3 Configuração do Ambiente - Python
    • 1.4 Configuração do Ambiente - Docker
    • 1.5 Dockerfile, Imagem e Contêiner Docker
    • 1.6 Docker Hub e Comandos Adicionais
  • 2. Desenvolvimento
    • 2.1 Do notebook para aplicação - parte 1
    • 2.2 Do notebook para aplicação - parte 2
    • 2.3 Do notebook para aplicação - parte 3
  • 3. Produção
    • 3.1 Desenvolvimento vs Produção: o fim ou o início?
    • 3.2 Ambiente de Produção - parte 1
    • 3.3 Ambiente de Produção - parte 2
    • 3.4 Ambiente de Produção - parte 3
  • 4. Monitoramento
    • 4.1 Introdução
    • 4.2 Configurando o Servidor de Monitoramento
    • 4.3 Monitorando Servidores do Ambiente de Produção
    • 4.4 Comandos de Verificação do Nagios
    • 4.5 Criando Verificações Mais Específicas
    • 4.6 Criando Alertas
    • 4.7 Recuperando de Problemas
    • 4.8 Verificação de Contêineres via NRPE
  • 5. Infraestrutura como Código e Orquestração
    • 5.1 Introdução
    • 5.2 Orquestração com Docker Compose
    • 5.3 Orquestração com Kubernetes
  • 6. Integração Contínua
    • 6.1 Introdução
    • 6.2 Controle de Versão
    • 6.3 Configurando um repositório no GitLab
    • 6.4 Branch e merge
    • 6.5 Pipeline de Integração Contínua com GitLab CI/CD
  • 7. Entrega Contínua
    • 7.1 Introdução
    • 7.2 Implantação automática no Docker Hub
    • 7.3 Implantação automática no Heroku
    • 7.4 Implantação automática no Google Kubernetes Engine (GKE)
    • 7.5 Testando tudo junto
  • 8. Model serving
    • 8.1 Introdução
    • 8.2 Model serving com mlserver
    • 8.3 CI/CD com GitLab e mlserver
    • 8.4 Testando tudo junto
  • 9. Model serving batch
    • 9.1 Introdução
    • 9.2 Spark
    • 9.3 Airflow
    • 9.4 Testando tudo junto
  • 10. MLOps com mlflow
    • 10.1 Introdução
    • 10.2 Visão geral do MLflow
    • 10.3 Configuração do MLflow
    • 10.4 Testando MLflow
  • 11. MLOps com Kubeflow
    • 11.1 Visão geral do Kubeflow
    • 11.2 Configuracão
    • 11.3 Kubeflow Pipeline
    • 11.4 Kserve
    • 11.5 Testando tudo junto
  • 12. Conclusão
    • Conclusão
Powered by GitBook
On this page
  • 9.3.1 História
  • 9.3.3 Arquitetura
  • 9.3.4 Preparando container com Airflow
  • 9.3.5 Data-aware schedulling
  1. 9. Model serving batch

9.3 Airflow

Previous9.2 SparkNext9.4 Testando tudo junto

Last updated 10 months ago

9.3.1 História

A Airnb iniciou como startup e tomou proporções globais, o que gerou uma complexidade na orquestracão dos fluxos de dados. Em 2014 iniciou a criação do Airflow para facilitar a cordenação publicado em 2015.

Desde a publicação a ferramenta ganhou força e foi encubada pela Apache Foundation em 2016.

9.3.3 Arquitetura

Os principais são:

  • Scheduler: Responsável por agendar a execucão das DAGs

  • WebServer: WebUi para mamipulação dos medatados e DAGs

  • Workers: Reponsável pela execução dos processo

  • Metadata databases: Responsável por armazenar os metadados do Airflow

  • Dag Directory: Contem os arquivos de DAGs

Diagrama de arquitetura do Airflow

Os workflows do Airflow são representados por DAGs

9.3.4 Preparando container com Airflow

Para preparar as configurações do Airflow crie uma pasta serving-batch/airflow.

FROM apache/airflow:slim-2.9.2-python3.10 AS airflow

RUN python3 -m pip install --upgrade pip wheel setuptools && \
    python3 -m pip cache purge

COPY requirements.txt /tmp/
RUN python3 -m pip install -r /tmp/requirements.txt && \
    python3 -m pip cache purge

ENV AIRFLOW_HOME=/opt/airflow

ENTRYPOINT "airflow" "standalone"

O arquivo de docker compose nos ajuda a gerir os containers. Neste exemplo vamos criar um volume para as dags e os modelos. Copie o modelo joblib na pasta model do Airflow.

services:
    airflow:
        build: .
        ports:
        - 8080:8080
        volumes:
        - ./dags:/opt/airflow/dags
        - ./model:/model

O arquivo de requirimets segue as mesmas versões das bilbiotecas que usamos no spark. Repare que também incluimos o plugin do Apache Livy do Spark que será utilizado para integrar o Spark com o Airflow.

pandas==1.3.5
nltk==3.7
scikit-learn==1.0.2
apache-airflow-providers-apache-livy==3.8.1
joblib==1.1.0
pyarrow==8.0.0
numpy==1.26.4
from datetime import datetime, timedelta
from textwrap import dedent
import joblib

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator

from airflow.utils.task_group import TaskGroup

from airflow.providers.apache.livy.operators.livy import LivyOperator

with DAG(
    'bash-operator',
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        'depends_on_past': False,
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='A simple tutorial DAG',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

    with TaskGroup(group_id='group_for') as tg_for:
        for i in range(1,5):
            t1 = BashOperator(
                task_id=f'print_date_t_for{i}',
                bash_command='date',
            )

    with TaskGroup(group_id='group_sleep') as tg1:
        t2 = BashOperator(
            task_id='sleep_t2',
            depends_on_past=False,
            bash_command='sleep 5',
            retries=3,
        )

        t3 = BashOperator(
            task_id='print_date_t3',
            bash_command='date',
        )

    t4 = BashOperator(
        task_id='print_date_t4',
        bash_command='date',
    )

    tg_for >> [t2, t3] >> t4 

O Scheduler do Airflow tem vários recursos de periodicidade e triggers de execução.

Inclua a seguinte função na sua DAG:

@task(task_id="print_the_context_t5")
def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    print(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

O decorator @task indica que esta função é uma task do Airflow.

Inclua ela no fim como tarefa5 e veja o resultado:

t5 = print_context()
tg_for >> [t2, t3] >> t4 >> t5 

Repare que a funcão é executada e o contexto da DAG é apresentado no log das tasks.

Agora que já sabemos como orquestrar um compando Python em uma DAG, vamos simular a inferência do modelo em uma Task Python.

@task(task_id="inferencia_t6")
def predict(ds=None, **kwargs):
    input_message = ["Figura Transformers Prime War Deluxe - E9687 - Hasbro",  "Senhor dos aneis", "Senhor dos anéis"]
    pipe = joblib.load("/model/classificador-produtos.joblib")
    final_prediction = pipe.predict(input_message)
    print("Predicted values:")
    print(",".join(final_prediction))
    return 'sucesso'

Inclua essa task no final da sua DAG como Task6 e veja que é possível executar a inferência dentro de uma Tarefa Python no Airflow. Explore o código para ler de arquivos externos, executar a inferência e gravar o resultado em um arquivo de saída.

9.3.5 Data-aware schedulling

Com este novo conceito uma DAG pode depender de um Dataset para ser iniciada e as tasks dentro de uma DAG podem gerar versões de Datasets. Assim é possível ter dependencia entre DAGs e um visão parcial de Data Lineage na interface de Dataset.

Vamos gerar duas novas DAGs de teste para visualizarmos esta dependencia:


from datetime import datetime, timedelta
from airflow.datasets import Dataset
from airflow.operators.bash import BashOperator
from airflow import DAG

with DAG(
    'DAG1',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1)
):
    t1 = BashOperator(
                task_id=f'GenerateDataset1',
                outlets=[Dataset("Dataset1")],
                bash_command='date',
            )

with DAG(
    'DAG2',
    schedule=[Dataset("Dataset1")],
    start_date=datetime(2021, 1, 1)
):
    t1 = BashOperator(
                task_id=f'GenerateDataset2',
                outlets=[Dataset("Dataset2")],
                bash_command='date',
            )
DAG

Vamos usar o airflow standalone para fins didáticos, para implementacões em produções usar a

Inicialize o airflow com o docker compose e verifique a WebUi na url . Com a WebUi é possível acompanhar e operar as execução das DAGs. Explore a para o aprofundamento.

O exemplo a seguir traz a utilizacão do , operador do Airflow que executa um comando Bash nos Workers do Airflow.

Faça teste com a dependência entre as Tasks e os e veja o resultado na WebUi do Airflow.

Explore a de parametrizacão das Dags, altere o pipeline e veja o resultado na WebUI.

Vamos explorar o .

É possivel controlar dependencia entre DAGs via ou desde a versão 2.4, quando o conceito de Dataset foi criado, é possível ter o mesmo resultado com .

documentacão oficial
localhost:8080
documentação da WebUI
Bash Operator
TaksGroups
documentacão
Python Operator
Task Sensor
Data-aware
Publicação de 2015 no medium da Airbnb por Maxime Beauchemin
componentes do Airflow