9.3 Airflow
9.3.1 História
A Airbnb iniciou como startup e tomou proporções globais, o que gerou uma complexidade na orquestração dos fluxos de dados. Em 2014 iniciou a criação do Airflow para facilitar a coordenação, publicado em 2015.
Publicação de 2015 no medium da Airbnb por Maxime Beauchemin
Desde a publicação a ferramenta ganhou força e foi incubada pela Apache Foundation em 2016.
9.3.2 Arquitetura
Os principais componentes do Airflow são:
Scheduler: Responsável por agendar a execução das DAGs
WebServer: WebUI para manipulação dos metadados e DAGs
Workers: Responsável pela execução dos processos
Metadata databases: Responsável por armazenar os metadados do Airflow
Dag Directory: Contém os arquivos de DAGs

Os workflows do Airflow são representados por DAGs.

9.3.3 Preparando container com Airflow
Para preparar as configurações do Airflow crie uma pasta serving-batch/airflow.
FROM apache/airflow:slim-2.9.2-python3.10 AS airflow
RUN python3 -m pip install --upgrade pip wheel setuptools && \
    python3 -m pip cache purge
COPY requirements.txt /tmp/
RUN python3 -m pip install -r /tmp/requirements.txt && \
    python3 -m pip cache purge
ENV AIRFLOW_HOME=/opt/airflow
ENTRYPOINT "airflow" "standalone"Vamos usar o Airflow standalone para fins didáticos. Para implementações em produção, usar a documentação oficial.
O arquivo de docker-compose.yaml nos ajuda a gerir os containers. Neste exemplo vamos criar um volume para as DAGs e os modelos. Copie o modelo joblib na pasta model do Airflow.
services:
    airflow:
        build: .
        ports:
        - 8080:8080
        volumes:
        - ./dags:/opt/airflow/dags
        - ./model:/modelO arquivo de requirements.txt segue as mesmas versões das bibliotecas que usamos no Spark. Repare que também incluímos o plugin do Apache Livy do Spark que será utilizado para integrar o Spark com o Airflow.
pandas==1.3.5
nltk==3.7
scikit-learn==1.0.2
apache-airflow-providers-apache-livy==3.8.1
joblib==1.1.0
pyarrow==8.0.0
numpy==1.26.4Inicialize o Airflow com o docker-compose e verifique a WebUI na url localhost:8080. Com a WebUI é possível acompanhar e operar as execuções das DAGs. Explore a documentação da WebUI para o aprofundamento.
O exemplo a seguir traz a utilização do Bash Operator, operador do Airflow que executa um comando Bash nos Workers do Airflow.
from datetime import datetime, timedelta
from textwrap import dedent
import joblib
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.decorators import dag, task
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator # Correção: PythonOperator está em airflow.operators.python
from airflow.utils.task_group import TaskGroup
from airflow.providers.apache.livy.operators.livy import LivyOperator
with DAG(
    'bash-operator',
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args={
        'depends_on_past': False,
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    },
    description='A simple tutorial DAG',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['example'],
) as dag:
    with TaskGroup(group_id='group_for') as tg_for:
        for i in range(1,5):
            t1 = BashOperator(
                task_id=f'print_date_t_for{i}',
                bash_command='date',
            )
    with TaskGroup(group_id='group_sleep') as tg1:
        t2 = BashOperator(
            task_id='sleep_t2',
            depends_on_past=False,
            bash_command='sleep 5',
            retries=3,
        )
        t3 = BashOperator(
            task_id='print_date_t3',
            bash_command='date',
        )
    t4 = BashOperator(
        task_id='print_date_t4',
        bash_command='date',
    )
    tg_for >> [t2, t3] >> t4Faça teste com a dependência entre as Tasks e os TaskGroups e veja o resultado na WebUI do Airflow.
O Scheduler do Airflow tem vários recursos de periodicidade e triggers de execução.
Explore a documentação de parametrização das DAGs, altere o pipeline e veja o resultado na WebUI.
Vamos explorar o Python Operator.
Inclua a seguinte função na sua DAG:
@task(task_id="print_the_context_t5")
def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    print(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'O decorator @task indica que esta função é uma task do Airflow.
Inclua ela no fim como tarefa5 e veja o resultado:
t5 = print_context()tg_for >> [t2, t3] >> t4 >> t5Repare que a função é executada e o contexto da DAG é apresentado no log das tasks.
Agora que já sabemos como orquestrar um comando Python em uma DAG, vamos simular a inferência do modelo em uma Task Python.
@task(task_id="inferencia_t6")
def predict(ds=None, **kwargs):
    input_message = ["Figura Transformers Prime War Deluxe - E9687 - Hasbro",  "Senhor dos aneis", "Senhor dos anéis"]
    pipe = joblib.load("/model/classificador-produtos.joblib")
    final_prediction = pipe.predict(input_message)
    print("Predicted values:")
    print(",".join(final_prediction))
    return 'sucesso'Inclua essa task no final da sua DAG como Task6 e veja que é possível executar a inferência dentro de uma Tarefa Python no Airflow. Explore o código para ler de arquivos externos, executar a inferência e gravar o resultado em um arquivo de saída.
9.3.4 Data-aware scheduling
É possível controlar dependência entre DAGs via Task Sensor ou desde a versão 2.4, quando o conceito de Dataset foi criado, é possível ter o mesmo resultado com Data-aware scheduling.
Com este novo conceito uma DAG pode depender de um Dataset para ser iniciada e as tasks dentro de uma DAG podem gerar versões de Datasets. Assim é possível ter dependência entre DAGs e uma visão parcial de Data Lineage na interface de Dataset.
Vamos gerar duas novas DAGs de teste para visualizarmos esta dependência:
from datetime import datetime, timedelta
from airflow.datasets import Dataset
from airflow.operators.bash import BashOperator
from airflow import DAG
with DAG(
    'DAG1',
    schedule_interval=None, # Alterado para schedule=None para consistência com a documentação mais recente
    start_date=datetime(2021, 1, 1),
    catchup=False # Adicionado para evitar execuções retroativas
):
    t1 = BashOperator(
                task_id='GenerateDataset1', # Removido f-string desnecessário
                outlets=[Dataset("Dataset1")],
                bash_command='date',
            )
with DAG(
    'DAG2',
    schedule=[Dataset("Dataset1")],
    start_date=datetime(2021, 1, 1),
    catchup=False # Adicionado para evitar execuções retroativas
):
    t1 = BashOperator(
                task_id='GenerateDataset2', # Removido f-string desnecessário
                outlets=[Dataset("Dataset2")],
                bash_command='date',
            )Last updated