> For the complete documentation index, see [llms.txt](https://aurimrv.gitbook.io/pratica-devops-com-docker-para-machine-learning/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://aurimrv.gitbook.io/pratica-devops-com-docker-para-machine-learning/id-9-model-serving-batch/9-4-testando-tudo-junto.md).

# 9.4 Testando tudo junto

Para integrar o Airflow com o Spark vamos utilizar o [Apache Livy](https://livy.apache.org). O Apache Livy é uma aplicação que fornece uma API REST para interagir com o contexto de **execução** do Spark.

## 9.4.1 Preparando Apache Livy

Da mesma forma que fizemos para o Airflow e para o Spark, crie uma pasta `serving-batch/livy` para prepararmos a imagem do **contêiner** do Livy.

O Dockerfile do Livy é semelhante ao do Spark, mas inclui a instalação do Livy.

```docker
FROM python:3.10.11-slim

ENV SPARK_VERSION=3.5.1
ENV HADOOP_VERSION=3
ENV LIVY_VERSION=0.8.0
ENV SCALA_VERSION=2.12
ENV SPARK_VERSION_STRING=spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}
ENV LIVY_VERSION_STRING=apache-livy-${LIVY_VERSION}-incubating_${SCALA_VERSION}-bin
ENV SPARK_HOME=/spark
ENV LIVY_HOME=/livy

# Instalação do Spark
ADD http://dlcdn.apache.org/spark/spark-${SPARK_VERSION}/${SPARK_VERSION_STRING}.tgz /
RUN tar xzf ${SPARK_VERSION_STRING}.tgz \
    && mv ${SPARK_VERSION_STRING} ${SPARK_HOME} \
    && rm ${SPARK_VERSION_STRING}.tgz

ENV OPENJDK_VERSION=17

# Instalação do Java e dependências do SO
RUN apt-get -y update && \
    apt-get install --no-install-recommends -y \
    openjdk-${OPENJDK_VERSION}-jdk \
    procps zip libssl-dev libkrb5-dev libffi-dev libxml2-dev libxslt1-dev python-dev build-essential && \
    apt-get clean && rm -rf /var/lib/apt/lists/*

# Instalação do Livy
ADD https://dlcdn.apache.org/incubator/livy/${LIVY_VERSION}-incubating/${LIVY_VERSION_STRING}.zip /
RUN unzip ${LIVY_VERSION_STRING}.zip \
    && mv ${LIVY_VERSION_STRING} ${LIVY_HOME} \
    && rm ${LIVY_VERSION_STRING}.zip \
    && mkdir ${LIVY_HOME}/logs    

# Atualização do pip e instalação de dependências Python
# Adicionado COPY do requirements.txt antes de usá-lo
COPY requirements.txt /tmp/ 
RUN python3 -m pip install --upgrade pip wheel setuptools && \
    python3 -m pip cache purge

# Instalação do PySpark e dependências do requirements.txt
RUN cd ${SPARK_HOME}/python && python3 setup.py install && \
    python3 -m pip install -r /tmp/requirements.txt && \
    python3 -m pip cache purge

ENV PATH=/usr/local/openjdk-${OPENJDK_VERSION}/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:${SPARK_HOME}/bin:${LIVY_HOME}/bin
# A linha ENV PYSPARK_PYTHON=/usr/bin/python3 é redundante se a próxima for usada.
# ENV PYSPARK_PYTHON=/usr/bin/python3 
# Esta deve prevalecer se python3 foi instalado via apt ou é o default do python:3.10.11-slim
ENV PYSPARK_PYTHON=/usr/local/bin/python3 
# Usar ${SPARK_HOME} para consistência
ENV SPARK_CONF_DIR=${SPARK_HOME}/conf 

COPY livy.conf ${LIVY_HOME}/conf
# Usar ${SPARK_HOME} para consistência
COPY spark-defaults.conf ${SPARK_HOME}/conf 

RUN ln -sf /usr/local/bin/python3 /usr/bin/python
# Porta padrão do Livy
EXPOSE 8998 
```

O Livy tem um arquivo de configuração chamado `livy.conf`, ele é copiado para a imagem do contêiner. Gere um arquivo na pasta de acordo com o exemplo:

```conf
livy.spark.master = local
livy.server.port = 8998
livy.file.local-dir-whitelist = /src
```

**Existem** muitas outras opções de configurações, como mostra o [exemplo](https://github.com/apache/incubator-livy/blob/master/conf/livy.conf.template).

O Livy **também** precisa dos arquivos `requirements.txt` e `spark-defaults.conf` (renomeado de `spark.conf`) mostrado na seção [9.2](/pratica-devops-com-docker-para-machine-learning/id-9-model-serving-batch/9-2-spark.md). Remova as **dependências** exclusivas do Jupyter para gerar o `requirements.txt` do Livy.

Agora no **diretório** `serving-batch` vamos gerar o `docker-compose.yaml` para fazer a **integração** entre os componentes:

```yaml
services:
    airflow:
        build: ./airflow
        ports:
        - "8080:8080" # Porta do Airflow Webserver
        volumes:
        - ./dags:/opt/airflow/dags
        - ./model:/model
    livy:
        build: ./livy
        ports:
            - "8998:8998" # Porta do Livy server
            - "4040:4040" # Porta da Spark UI (se o Livy/Spark a expuser)
            # - "7077:7077" # Porta do Spark Master (se em modo standalone, não local)
        command: livy-server
        volumes:
            - ./src:/src
            - ./model:/model
            - ./data:/data
```

Repare que os volumes apontam para os diretórios locais e precisamos copiar os dados e o modelo treinado para as pastas. Vamos rodar o Airflow junto para simular a **integração**, repare "airflow" dentro do `docker-compose.yaml`.

Agora sim podemos fazer o build do Livy através do comando do `docker-compose`.

## 9.4.2 Código da aplicação Spark

Copie o exemplo [pi.py](https://github.com/apache/spark/blob/master/examples/src/main/python/pi.py) para a pasta `/src`. Ele será o primeiro teste com Spark e Livy.

```python
import sys
from random import random
from operator import add

from pyspark.sql import SparkSession


if __name__ == "__main__":
    """
        Usage: pi [partitions]
    """
    spark = SparkSession\
        .builder\
        .appName("PythonPi")\
        .getOrCreate()

    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions

    def f(_: int) -> float:
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 <= 1 else 0

    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n))

    spark.stop() # Adicionado para encerrar a sessão Spark
```

## 9.4.3 Airflow Livy Operator

O Airflow tem um operador para implementar a integração com o Livy. Verifique a [documentação do operador](https://airflow.apache.org/docs/apache-airflow-providers-apache-livy/stable/_api/airflow/providers/apache/livy/operators/livy/index.html). O seguinte trecho é uma chamada de task para o operador do Livy:

```python
    livy_python_task = LivyOperator(
        task_id='livy_pi_task', # Nome mais descritivo
        file='/src/pi.py',
        polling_interval=60, # Intervalo em segundos para verificar o status do job
        livy_conn_id='livy_default', # ID da conexão Livy configurada no Airflow
        # args=[str(partitions_value)], # Se precisar passar argumentos para o script pi.py
        executor_memory="1g",
        executor_cores=2,
        driver_memory="1g",
        driver_cores=1,
        num_executors=1
    )
```

Este operador utiliza a [conexão default do Livy](https://airflow.apache.org/docs/apache-airflow-providers/core-extensions/connections.html) para obter configurações como endpoint e porta para conexão. **Certifique-se de configurar esta conexão na UI do Airflow.**

Execute sua DAG e veja o resultado.

## 9.4.4 Executando a inferência

Da mesma forma que executamos o `pi.py` podemos executar a inferência do modelo, assim como já vimos na seção [9.2](/pratica-devops-com-docker-para-machine-learning/id-9-model-serving-batch/9-2-spark.md).

```python
import sys
# from random import random # Não utilizado neste script
# from operator import add # Não utilizado neste script

from pyspark.sql import SparkSession

from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import pandas_udf, PandasUDFType # PandasUDFType não é mais necessário a partir do Spark 3.0 para este tipo de UDF
import pandas as pd
import joblib

if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("InferenciaProdutos")\
        .getOrCreate()

    # Carregando o modelo serializado e fazendo broadcast
    # O caminho /model/ é relativo ao que foi montado no container Livy/Spark
    pipe = spark.sparkContext.broadcast(joblib.load("/model/classificador-produtos.joblib"))

    # Define Pandas UDF para predição
    @pandas_udf("string")
    def predict(descricao: pd.Series) -> pd.Series:
        # Acessa o modelo broadcasted
        model = pipe.value
        msg_predict = model.predict(descricao.tolist())
        return pd.Series(msg_predict)

    # Carrega os dados de entrada
    # O caminho /data/ é relativo ao que foi montado no container Livy/Spark
    df_input = spark.read.option("delimiter", ';').option("header","true").csv("/data/produtos.csv")

    # Prepara o DataFrame para inferência
    df_input = df_input.filter("descricao is not null").select("descricao").withColumn("id", monotonically_increasing_id())

    # Aplica a UDF para fazer as predições
    df_predict = df_input.dropna().withColumn("predict", predict("descricao"))    

    # Salva as predições (exemplo: 10 primeiras linhas em formato parquet)
    # O caminho /data/ é relativo ao que foi montado no container Livy/Spark
    df_predict.limit(10).write.mode('overwrite').parquet("/data/predicoes_output") # Nome de saída alterado para evitar conflito com pasta de entrada

    spark.stop() # Adicionado para encerrar a sessão Spark
```

Inclua essa nova task na sua DAG e verifique o resultado.

## 9.4.5 Kubernetes

Existem outras formas de integrar o Airflow com o Spark, uma delas é usando a **execução** nativa do Spark com Kubernetes e o operador de Airflow para Spark. Explore a documentação do [Apache Spark](https://spark.apache.org/docs/latest/running-on-kubernetes.html) e [Apache Airflow](https://airflow.apache.org/docs/apache-airflow/stable/kubernetes.html) para configuração em ambiente Kubernetes.

## 9.4.6 Cloud

Muitos provedores de cloud oferecem serviços gerenciados para Spark (ex: Databricks, AWS EMR, Google Dataproc) e Airflow (ex: AWS MWAA, Google Cloud Composer). A integração entre eles geralmente é facilitada por operadores específicos fornecidos pelos provedores ou pela comunidade Airflow.


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://aurimrv.gitbook.io/pratica-devops-com-docker-para-machine-learning/id-9-model-serving-batch/9-4-testando-tudo-junto.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
