Para integrar o Airflow com o Spark vamos utilizar o Apache Livy. O Apache Livy é uma aplicação que fornece uma API REST para integragir com o contexto de execucão do Spark.
Preparando Apache Livy
Da mesma forma que fizemos para o Aiflow e para o Spark, crie uma pasta serving-batch/livy para prepararmos a imagem do conteiner do Livy.
O Dockerfile do Livy é semelhante ao do Spark, mas inclui a instalação do Livy.
O Livy tem um arquivo de configuração chamado livy.conf, ele é copiado para a imagem do container. Gere um arquivo na pasta de acordo com o exemplo:
livy.spark.master = local
livy.server.port = 8998
livy.file.local-dir-whitelist = /src
Exixtem muitas outras opções de configurações, como mostra o exemplo.
O Livy també precisa dos arquivos requirements.txt e spark.conf mostrado na sessão 10.2. Remova as dependencias exclusivas do jupyter para gerar o requirements do livy.
Agora no diretorio serving-batch vamos gerar o docker-compose para fazer a integracão entre os componentes:
Repare que os volumes apontam para os diretórios locais e precisamos copiar os dados e o modelo treinado para as pastas. Vamos rodar o airflow junto para simular a integracão, repare "airflow" dentro do docker compose.
Agora sim podemos fazer o build do livy através do comando do docker-compose.
Código da aplicacão Spark
Copie o exemplo pi.py para pasta /src. Ele será o primeiro teste com Spark e Livy.
import sysfrom random import randomfrom operator import addfrom pyspark.sql import SparkSessionif__name__=="__main__":""" Usage: pi [partitions] """ spark = SparkSession\.builder\.appName("PythonPi")\.getOrCreate() partitions =int(sys.argv[1])iflen(sys.argv)>1else2 n =100000* partitionsdeff(_:int) ->float: x =random()*2-1 y =random()*2-1return1if x **2+ y **2<=1else0 count = spark.sparkContext.parallelize(range(1, n +1), partitions).map(f).reduce(add)print("Pi is roughly %f"% (4.0* count / n))
Airflow Livy Operator
O Airflow tem um operador para implementar a integração com o Livy. Verifique a documentacão do operador. O seguinte trecho é uma chamada de task para o operador do Livy:
Inclua essa nova task na sua DAG e verifique o resultado.
Kubernetes
Existem outras formas de integrar o Airflow com o Spark, uma delas é usando a execuão nativa do Spark com Kubernetes e o operador de Airflow para Spark. Explore a documentação do Apache Spark e Apache Airflow para configuração em ambiente Kubernetes.