A Airnb iniciou como startup e tomou proporções globais, o que gerou uma complexidade na orquestracão dos fluxos de dados. Em 2014 iniciou a criação do Airflow para facilitar a cordenação publicado em 2015.
Vamos usar o airflow standalone para fins didáticos, para implementacões em produções usar a documentacão oficial
O arquivo de docker compose nos ajuda a gerir os containers. Neste exemplo vamos criar um volume para as dags e os modelos. Copie o modelo joblib na pasta model do Airflow.
O arquivo de requirimets segue as mesmas versões das bilbiotecas que usamos no spark. Repare que também incluimos o plugin do Apache Livy do Spark que será utilizado para integrar o Spark com o Airflow.
Inicialize o airflow com o docker compose e verifique a WebUi na url localhost:8080. Com a WebUi é possível acompanhar e operar as execução das DAGs. Explore a documentação da WebUI para o aprofundamento.
O exemplo a seguir traz a utilizacão do Bash Operator, operador do Airflow que executa um comando Bash nos Workers do Airflow.
from datetime import datetime, timedeltafrom textwrap import dedentimport joblib# The DAG object; we'll need this to instantiate a DAGfrom airflow import DAG# Operators; we need this to operate!from airflow.decorators import dag, taskfrom airflow.operators.bash import BashOperatorfrom airflow.operators.python_operator import PythonOperatorfrom airflow.utils.task_group import TaskGroupfrom airflow.providers.apache.livy.operators.livy import LivyOperatorwithDAG('bash-operator',# These args will get passed on to each operator# You can override them on a per-task basis during operator initialization default_args={'depends_on_past': False,'email': ['airflow@example.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(minutes=5), }, description='A simple tutorial DAG', schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'],)as dag:withTaskGroup(group_id='group_for')as tg_for:for i inrange(1,5): t1 =BashOperator( task_id=f'print_date_t_for{i}', bash_command='date', )withTaskGroup(group_id='group_sleep')as tg1: t2 =BashOperator( task_id='sleep_t2', depends_on_past=False, bash_command='sleep 5', retries=3, ) t3 =BashOperator( task_id='print_date_t3', bash_command='date', ) t4 =BashOperator( task_id='print_date_t4', bash_command='date', ) tg_for >> [t2, t3] >> t4
Faça teste com a dependência entre as Tasks e os TaksGroups e veja o resultado na WebUi do Airflow.
O Scheduler do Airflow tem vários recursos de periodicidade e triggers de execução.
Explore a documentacão de parametrizacão das Dags, altere o pipeline e veja o resultado na WebUI.
@task(task_id="print_the_context_t5")defprint_context(ds=None,**kwargs):"""Print the Airflow context and ds variable from the context."""print(kwargs)print(ds)return'Whatever you return gets printed in the logs'
O decorator @task indica que esta função é uma task do Airflow.
Inclua ela no fim como tarefa5 e veja o resultado:
t5 =print_context()
tg_for >> [t2, t3] >> t4 >> t5
Repare que a funcão é executada e o contexto da DAG é apresentado no log das tasks.
Agora que já sabemos como orquestrar um compando Python em uma DAG, vamos simular a inferência do modelo em uma Task Python.
@task(task_id="inferencia_t6")defpredict(ds=None,**kwargs): input_message = ["Figura Transformers Prime War Deluxe - E9687 - Hasbro","Senhor dos aneis","Senhor dos anéis"] pipe = joblib.load("/model/classificador-produtos.joblib") final_prediction = pipe.predict(input_message)print("Predicted values:")print(",".join(final_prediction))return'sucesso'
Inclua essa task no final da sua DAG como Task6 e veja que é possível executar a inferência dentro de uma Tarefa Python no Airflow. Explore o código para ler de arquivos externos, executar a inferência e gravar o resultado em um arquivo de saída.
9.3.5 Data-aware schedulling
É possivel controlar dependencia entre DAGs via Task Sensor ou desde a versão 2.4, quando o conceito de Dataset foi criado, é possível ter o mesmo resultado com Data-aware.
Com este novo conceito uma DAG pode depender de um Dataset para ser iniciada e as tasks dentro de uma DAG podem gerar versões de Datasets. Assim é possível ter dependencia entre DAGs e um visão parcial de Data Lineage na interface de Dataset.
Vamos gerar duas novas DAGs de teste para visualizarmos esta dependencia: