9.4 Testando tudo junto
Para integrar o Airflow com o Spark vamos utilizar o Apache Livy. O Apache Livy é uma aplicação que fornece uma API REST para interagir com o contexto de execução do Spark.
9.4.1 Preparando Apache Livy
Da mesma forma que fizemos para o Airflow e para o Spark, crie uma pasta serving-batch/livy
para prepararmos a imagem do contêiner do Livy.
O Dockerfile do Livy é semelhante ao do Spark, mas inclui a instalação do Livy.
FROM python:3.10.11-slim
ENV SPARK_VERSION=3.5.1
ENV HADOOP_VERSION=3
ENV LIVY_VERSION=0.8.0
ENV SCALA_VERSION=2.12
ENV SPARK_VERSION_STRING=spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}
ENV LIVY_VERSION_STRING=apache-livy-${LIVY_VERSION}-incubating_${SCALA_VERSION}-bin
ENV SPARK_HOME=/spark
ENV LIVY_HOME=/livy
# Instalação do Spark
ADD http://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/${SPARK_VERSION_STRING}.tgz /
RUN tar xzf ${SPARK_VERSION_STRING}.tgz \
&& mv ${SPARK_VERSION_STRING} ${SPARK_HOME} \
&& rm ${SPARK_VERSION_STRING}.tgz
ENV OPENJDK_VERSION=17
# Instalação do Java e dependências do SO
RUN apt-get -y update && \
apt-get install --no-install-recommends -y \
openjdk-${OPENJDK_VERSION}-jdk \
procps zip libssl-dev libkrb5-dev libffi-dev libxml2-dev libxslt1-dev python-dev build-essential && \
apt-get clean && rm -rf /var/lib/apt/lists/*
# Instalação do Livy
ADD https://dlcdn.apache.org/incubator/livy/${LIVY_VERSION}-incubating/${LIVY_VERSION_STRING}.zip /
RUN unzip ${LIVY_VERSION_STRING}.zip \
&& mv ${LIVY_VERSION_STRING} ${LIVY_HOME} \
&& rm ${LIVY_VERSION_STRING}.zip \
&& mkdir ${LIVY_HOME}/logs
# Atualização do pip e instalação de dependências Python
# Adicionado COPY do requirements.txt antes de usá-lo
COPY requirements.txt /tmp/
RUN python3 -m pip install --upgrade pip wheel setuptools && \
python3 -m pip cache purge
# Instalação do PySpark e dependências do requirements.txt
RUN cd ${SPARK_HOME}/python && python3 setup.py install && \
python3 -m pip install -r /tmp/requirements.txt && \
python3 -m pip cache purge
ENV PATH=/usr/local/openjdk-${OPENJDK_VERSION}/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:${SPARK_HOME}/bin:${LIVY_HOME}/bin
# A linha ENV PYSPARK_PYTHON=/usr/bin/python3 é redundante se a próxima for usada.
# ENV PYSPARK_PYTHON=/usr/bin/python3
# Esta deve prevalecer se python3 foi instalado via apt ou é o default do python:3.10.11-slim
ENV PYSPARK_PYTHON=/usr/local/bin/python3
# Usar ${SPARK_HOME} para consistência
ENV SPARK_CONF_DIR=${SPARK_HOME}/conf
COPY livy.conf ${LIVY_HOME}/conf
# Usar ${SPARK_HOME} para consistência
COPY spark-defaults.conf ${SPARK_HOME}/conf
RUN ln -sf /usr/local/bin/python3 /usr/bin/python
# Porta padrão do Livy
EXPOSE 8998
O Livy tem um arquivo de configuração chamado livy.conf
, ele é copiado para a imagem do contêiner. Gere um arquivo na pasta de acordo com o exemplo:
livy.spark.master = local
livy.server.port = 8998
livy.file.local-dir-whitelist = /src
Existem muitas outras opções de configurações, como mostra o exemplo.
O Livy também precisa dos arquivos requirements.txt
e spark-defaults.conf
(renomeado de spark.conf
) mostrado na seção 9.2. Remova as dependências exclusivas do Jupyter para gerar o requirements.txt
do Livy.
Agora no diretório serving-batch
vamos gerar o docker-compose.yaml
para fazer a integração entre os componentes:
services:
airflow:
build: ./airflow
ports:
- "8080:8080" # Porta do Airflow Webserver
volumes:
- ./dags:/opt/airflow/dags
- ./model:/model
livy:
build: ./livy
ports:
- "8998:8998" # Porta do Livy server
- "4040:4040" # Porta da Spark UI (se o Livy/Spark a expuser)
# - "7077:7077" # Porta do Spark Master (se em modo standalone, não local)
command: livy-server
volumes:
- ./src:/src
- ./model:/model
- ./data:/data
Repare que os volumes apontam para os diretórios locais e precisamos copiar os dados e o modelo treinado para as pastas. Vamos rodar o Airflow junto para simular a integração, repare "airflow" dentro do docker-compose.yaml
.
Agora sim podemos fazer o build do Livy através do comando do docker-compose
.
9.4.2 Código da aplicação Spark
Copie o exemplo pi.py para a pasta /src
. Ele será o primeiro teste com Spark e Livy.
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
spark = SparkSession\
.builder\
.appName("PythonPi")\
.getOrCreate()
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_: int) -> float:
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 <= 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
spark.stop() # Adicionado para encerrar a sessão Spark
9.4.3 Airflow Livy Operator
O Airflow tem um operador para implementar a integração com o Livy. Verifique a documentação do operador. O seguinte trecho é uma chamada de task para o operador do Livy:
livy_python_task = LivyOperator(
task_id='livy_pi_task', # Nome mais descritivo
file='/src/pi.py',
polling_interval=60, # Intervalo em segundos para verificar o status do job
livy_conn_id='livy_default', # ID da conexão Livy configurada no Airflow
# args=[str(partitions_value)], # Se precisar passar argumentos para o script pi.py
executor_memory="1g",
executor_cores=2,
driver_memory="1g",
driver_cores=1,
num_executors=1
)
Este operador utiliza a conexão default do Livy para obter configurações como endpoint e porta para conexão. Certifique-se de configurar esta conexão na UI do Airflow.
Execute sua DAG e veja o resultado.
9.4.4 Executando a inferência
Da mesma forma que executamos o pi.py
podemos executar a inferência do modelo, assim como já vimos na seção 9.2.
import sys
# from random import random # Não utilizado neste script
# from operator import add # Não utilizado neste script
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import pandas_udf, PandasUDFType # PandasUDFType não é mais necessário a partir do Spark 3.0 para este tipo de UDF
import pandas as pd
import joblib
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("InferenciaProdutos")\
.getOrCreate()
# Carregando o modelo serializado e fazendo broadcast
# O caminho /model/ é relativo ao que foi montado no container Livy/Spark
pipe = spark.sparkContext.broadcast(joblib.load("/model/classificador-produtos.joblib"))
# Define Pandas UDF para predição
@pandas_udf("string")
def predict(descricao: pd.Series) -> pd.Series:
# Acessa o modelo broadcasted
model = pipe.value
msg_predict = model.predict(descricao.tolist())
return pd.Series(msg_predict)
# Carrega os dados de entrada
# O caminho /data/ é relativo ao que foi montado no container Livy/Spark
df_input = spark.read.option("delimiter", ';').option("header","true").csv("/data/produtos.csv")
# Prepara o DataFrame para inferência
df_input = df_input.filter("descricao is not null").select("descricao").withColumn("id", monotonically_increasing_id())
# Aplica a UDF para fazer as predições
df_predict = df_input.dropna().withColumn("predict", predict("descricao"))
# Salva as predições (exemplo: 10 primeiras linhas em formato parquet)
# O caminho /data/ é relativo ao que foi montado no container Livy/Spark
df_predict.limit(10).write.mode('overwrite').parquet("/data/predicoes_output") # Nome de saída alterado para evitar conflito com pasta de entrada
spark.stop() # Adicionado para encerrar a sessão Spark
Inclua essa nova task na sua DAG e verifique o resultado.
9.4.5 Kubernetes
Existem outras formas de integrar o Airflow com o Spark, uma delas é usando a execução nativa do Spark com Kubernetes e o operador de Airflow para Spark. Explore a documentação do Apache Spark e Apache Airflow para configuração em ambiente Kubernetes.
9.4.6 Cloud
Muitos provedores de cloud oferecem serviços gerenciados para Spark (ex: Databricks, AWS EMR, Google Dataproc) e Airflow (ex: AWS MWAA, Google Cloud Composer). A integração entre eles geralmente é facilitada por operadores específicos fornecidos pelos provedores ou pela comunidade Airflow.
Last updated