9.3 Airflow

9.3.1 História

A Airbnb iniciou como startup e tomou proporções globais, o que gerou uma complexidade na orquestração dos fluxos de dados. Em 2014 iniciou a criação do Airflow para facilitar a coordenação, publicado em 2015.

Publicação de 2015 no medium da Airbnb por Maxime Beauchemin

Desde a publicação a ferramenta ganhou força e foi incubada pela Apache Foundation em 2016.

9.3.2 Arquitetura

Os principais componentes do Airflow são:

  • Scheduler: Responsável por agendar a execução das DAGs

  • WebServer: WebUI para manipulação dos metadados e DAGs

  • Workers: Responsável pela execução dos processos

  • Metadata databases: Responsável por armazenar os metadados do Airflow

  • Dag Directory: Contém os arquivos de DAGs

Diagrama de arquitetura do Airflow

Os workflows do Airflow são representados por DAGs.

DAG

9.3.3 Preparando container com Airflow

Para preparar as configurações do Airflow crie uma pasta serving-batch/airflow.

FROM apache/airflow:slim-2.9.2-python3.10 AS airflow

RUN python3 -m pip install --upgrade pip wheel setuptools && \
    python3 -m pip cache purge

COPY requirements.txt /tmp/
RUN python3 -m pip install -r /tmp/requirements.txt && \
    python3 -m pip cache purge

ENV AIRFLOW_HOME=/opt/airflow

ENTRYPOINT "airflow" "standalone"

Vamos usar o Airflow standalone para fins didáticos. Para implementações em produção, usar a documentação oficial.

O arquivo de docker-compose.yaml nos ajuda a gerir os containers. Neste exemplo vamos criar um volume para as DAGs e os modelos. Copie o modelo joblib na pasta model do Airflow.

services:
    airflow:
        build: .
        ports:
        - 8080:8080
        volumes:
        - ./dags:/opt/airflow/dags
        - ./model:/model

O arquivo de requirements.txt segue as mesmas versões das bibliotecas que usamos no Spark. Repare que também incluímos o plugin do Apache Livy do Spark que será utilizado para integrar o Spark com o Airflow.

pandas==1.3.5
nltk==3.7
scikit-learn==1.0.2
apache-airflow-providers-apache-livy==3.8.1
joblib==1.1.0
pyarrow==8.0.0
numpy==1.26.4

Inicialize o Airflow com o docker-compose e verifique a WebUI na url localhost:8080. Com a WebUI é possível acompanhar e operar as execuções das DAGs. Explore a documentação da WebUI para o aprofundamento.

O exemplo a seguir traz a utilização do Bash Operator, operador do Airflow que executa um comando Bash nos Workers do Airflow.

from datetime import datetime, timedelta
from textwrap import dedent
import joblib

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator # Correção: PythonOperator está em airflow.operators.python

from airflow.utils.task_group import TaskGroup

from airflow.providers.apache.livy.operators.livy import LivyOperator

with DAG(
    'bash-operator',
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        'depends_on_past': False,
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='A simple tutorial DAG',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:

    with TaskGroup(group_id='group_for') as tg_for:
        for i in range(1,5):
            t1 = BashOperator(
                task_id=f'print_date_t_for{i}',
                bash_command='date',
            )

    with TaskGroup(group_id='group_sleep') as tg1:
        t2 = BashOperator(
            task_id='sleep_t2',
            depends_on_past=False,
            bash_command='sleep 5',
            retries=3,
        )

        t3 = BashOperator(
            task_id='print_date_t3',
            bash_command='date',
        )

    t4 = BashOperator(
        task_id='print_date_t4',
        bash_command='date',
    )

    tg_for >> [t2, t3] >> t4

Faça teste com a dependência entre as Tasks e os TaskGroups e veja o resultado na WebUI do Airflow.

O Scheduler do Airflow tem vários recursos de periodicidade e triggers de execução.

Explore a documentação de parametrização das DAGs, altere o pipeline e veja o resultado na WebUI.

Vamos explorar o Python Operator.

Inclua a seguinte função na sua DAG:

@task(task_id="print_the_context_t5")
def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    print(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

O decorator @task indica que esta função é uma task do Airflow.

Inclua ela no fim como tarefa5 e veja o resultado:

t5 = print_context()
tg_for >> [t2, t3] >> t4 >> t5

Repare que a função é executada e o contexto da DAG é apresentado no log das tasks.

Agora que já sabemos como orquestrar um comando Python em uma DAG, vamos simular a inferência do modelo em uma Task Python.

@task(task_id="inferencia_t6")
def predict(ds=None, **kwargs):
    input_message = ["Figura Transformers Prime War Deluxe - E9687 - Hasbro",  "Senhor dos aneis", "Senhor dos anéis"]
    pipe = joblib.load("/model/classificador-produtos.joblib")
    final_prediction = pipe.predict(input_message)
    print("Predicted values:")
    print(",".join(final_prediction))
    return 'sucesso'

Inclua essa task no final da sua DAG como Task6 e veja que é possível executar a inferência dentro de uma Tarefa Python no Airflow. Explore o código para ler de arquivos externos, executar a inferência e gravar o resultado em um arquivo de saída.

9.3.4 Data-aware scheduling

É possível controlar dependência entre DAGs via Task Sensor ou desde a versão 2.4, quando o conceito de Dataset foi criado, é possível ter o mesmo resultado com Data-aware scheduling.

Com este novo conceito uma DAG pode depender de um Dataset para ser iniciada e as tasks dentro de uma DAG podem gerar versões de Datasets. Assim é possível ter dependência entre DAGs e uma visão parcial de Data Lineage na interface de Dataset.

Vamos gerar duas novas DAGs de teste para visualizarmos esta dependência:

from datetime import datetime, timedelta
from airflow.datasets import Dataset
from airflow.operators.bash import BashOperator
from airflow import DAG

with DAG(
    'DAG1',
    schedule_interval=None, # Alterado para schedule=None para consistência com a documentação mais recente
    start_date=datetime(2021, 1, 1),
    catchup=False # Adicionado para evitar execuções retroativas
):
    t1 = BashOperator(
                task_id='GenerateDataset1', # Removido f-string desnecessário
                outlets=[Dataset("Dataset1")],
                bash_command='date',
            )

with DAG(
    'DAG2',
    schedule=[Dataset("Dataset1")],
    start_date=datetime(2021, 1, 1),
    catchup=False # Adicionado para evitar execuções retroativas
):
    t1 = BashOperator(
                task_id='GenerateDataset2', # Removido f-string desnecessário
                outlets=[Dataset("Dataset2")],
                bash_command='date',
            )

Last updated