Para integrar o Airflow com o Spark vamos utilizar o Apache Livy. O Apache Livy é uma aplicação que fornece uma API REST para integragir com o contexto de execucão do Spark.
Preparando Apache Livy
Da mesma forma que fizemos para o Aiflow e para o Spark, crie uma pasta serving-batch/livy para prepararmos a imagem do conteiner do Livy.
O Dockerfile do Livy é semelhante ao do Spark, mas inclui a instalação do Livy.
O Livy tem um arquivo de configuração chamado livy.conf, ele é copiado para a imagem do container. Gere um arquivo na pasta de acordo com o exemplo:
livy.spark.master = local
livy.server.port = 8998
livy.file.local-dir-whitelist = /src
Exixtem muitas outras opções de configurações, como mostra o exemplo.
O Livy també precisa dos arquivos requirements.txt e spark.conf mostrado na sessão 10.2. Remova as dependencias exclusivas do jupyter para gerar o requirements do livy.
Agora no diretorio serving-batch vamos gerar o docker-compose para fazer a integracão entre os componentes:
Repare que os volumes apontam para os diretórios locais e precisamos copiar os dados e o modelo treinado para as pastas. Vamos rodar o airflow junto para simular a integracão, repare "airflow" dentro do docker compose.
Agora sim podemos fazer o build do livy através do comando do docker-compose.
Código da aplicacão Spark
Copie o exemplo pi.py para pasta /src. Ele será o primeiro teste com Spark e Livy.
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
spark = SparkSession\
.builder\
.appName("PythonPi")\
.getOrCreate()
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_:int) -> float:
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 <= 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
Airflow Livy Operator
O Airflow tem um operador para implementar a integração com o Livy. Verifique a documentacão do operador. O seguinte trecho é uma chamada de task para o operador do Livy:
Este operador utiliza a conexão default do livy para obter configurações como endpoint e porta para conexão.
Execute sua DAG e veja o resultado.
Executando a inferência
Da mesma forma que executamos o pi.py podemos executar a inferência do modelo, assim como já vimos na sessão 10.2.
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd
import joblib
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("InferenciaProdutos")\
.getOrCreate()
pipe = spark.sparkContext.broadcast(joblib.load("/model/classificador-produtos.joblib"))
# Define Pandas UDF
@pandas_udf("string")
def predict(descricao: pd.Series) -> pd.Series:
msg_predict = pipe.value.predict(descricao.tolist())
return pd.Series(msg_predict)
df_input = spark.read.option("delimiter", ';').option("header","true").csv("/data/produtos.csv")
df_input = df_input.filter("descricao is not null").select("descricao").withColumn("id", monotonically_increasing_id())
df_predict = df_input.dropna().withColumn("predict", predict("descricao"))
df_predict.limit(10).write.mode('overwrite').parquet("/data/predicoes")
Inclua essa nova task na sua DAG e verifique o resultado.
Kubernetes
Existem outras formas de integrar o Airflow com o Spark, uma delas é usando a execuão nativa do Spark com Kubernetes e o operador de Airflow para Spark. Explore a documentação do Apache Spark e Apache Airflow para configuração em ambiente Kubernetes.