Gostaria de apresentar (mais) uma abordagem para a compreensão de conceitos relacionados ao Apache Spark. Muito do que será apresentado encontra-se facilmente em livros, publicações em blogs (como este), cursos pagos e vídeo-aulas no YouTube. Aqui, só organizei esses materiais de uma nova forma e, considerando a minha experiência, evidenciei alguns detalhes importantes que geralmente passam despercebidos. Este texto foi dividido em três grandes seções.

Na primeira seção, “Uma visão geral dos componentes do Apache Spark”, abordo de forma ampla os componentes básicos do Apache Spark e o papel de cada numa aplicação. Também apresento um “esqueleto” de uma aplicação Spark para melhor entendimento da relação entre trabalho, etapa e tarefa.

Na segunda seção, trato dos conceitos de “Ações, transformações e avaliação preguiçosa”. Com exceção do último termo, os primeiros são bastante mencionados na seção anterior. Recomendo a quem não estiver familiarizado a estes termos ler a primeira sessão por completo considerando-os como “processos genéricos” e relê-los, caso necessário, depois que estiver familiarizado com os conceitos apresentados nesta seção.

Por fim, concluo com um resumo dos pontos apresentados nas sessões anteriores.


Uma visão geral dos componentes básicos do Apache Spark

A execução de uma aplicação do Apache Spark envolve um programa Driver, alguns executores e um gerenciador de recursos. Na documentação oficial do Apache Spark temos a seguinte imagem:

Arquitetura de uma aplicação Spark

A figura acima representa uma aplicação Spark em modo cluster. Neste modo, cada componente é alocado em um servidor (nó ou “node”) separado. O processo Driver é alocado no nó driver (“driver node”) e os executores são alocados em nós trabalhadores (“worker nodes”). O gerenciador de recursos ou gerenciador do cluster (“cluster manager”) por si só pode ser outro cluster (Apache Mesos, Apache Hadoop YARN ou Kubernetes).

Por hora, vamos adaptar a imagem acima para uma versão simplificada e que também pode ser generalizada para outros modos de deploy.

Arquitetura simplificada de uma aplicação Spark

Desta forma, trataremos dos seguintes componentes:

  • Processo Driver.
  • Gerenciador de recursos.
  • Executores.

Um detalhe importante de ressaltar é que os componentes acima são executados como processos distintos, podendo ser executados em diferentes nós que compõem um cluster. Este detalhe faz muita diferença no momento em que começamos a planejar a escrita do nosso código.

Driver

O processo driver de uma aplicação Spark tem as seguintes responsabilidades:

  • Requisitar recursos físicos ao gerenciador de recursos, como memória e CPU, ao para a execução da aplicação Spark.
  • Manter as informações sobre a aplicação em execução.
  • Interagir com o código do usuário.
  • Analisar, distribuir e agendar a execução da aplicação através dos executores.

Toda interação entre o código do usuário e os demais recursos é realizada através da SparkSession de uma determinada aplicação. Em casos onde o usuário deseja especificar, por exemplo, a quantidade de executores para uma aplicação, essa especificação se dá através da mudança de parâmetros da SparkSession.

Um ponto não evidenciado pela descrição das responsabilidades do processo driver, ou pelos diagramas apresentados anteriormente, é que o driver geralmente não processa os dados da aplicação. Portanto, esse processo acaba por demandar menos recursos computacionais em comparação com os executores. Por esta razão é comum encontrarmos clusters Spark onde o nó driver é menos potente que os nós trabalhadores.

Em ações no Spark, como por exemplo o .collect()os dados são coletados dos executores pelo driver, envolvendo o mesmo no processamento dos dados. Nesses casos, é possível que o processo driver demande mais recursos que os executores, havendo então a necessidade de disponibilizar uma máquina mais poderosa para alocá-lo.

Gerenciador de recursos

O gerenciador de recursos de uma aplicação Spark é, como o nome sugere, o componente que gerencia a distribuição de recursos computacionais para que a aplicação seja executada. É o gerenciador de recursos que reserva uma quantidade de memória e CPU, por um determinado espaço de tempo, para a execução de uma aplicação e avisa a todos os outros processos que aquela quantidade de recurso está reservada para a aplicação. Ao fim da aplicação, também é ele quem avisa aos demais processos que a quantidade de recursos está disponível para uso por outros processos.

O gerenciador de recursos também realoca recursos na ocorrência de falhas. Por exemplo, caso uma máquina alocando um processo executor falhe devido a algum problema no sistema operacional, o gerenciador de recursos tomará conhecimento deste problema e iniciará um processo executor em outra máquina.

Como brevemente citado anteriormente, o gerenciador de recursos pode ser:

Não é comum que a pessoa desenvolvedora da aplicação Spark ter ação sobre o gerenciador de recursos, mas é esperado que a pessoa monitore algumas métricas, como uso geral de CPU e memória do cluster. Essas métricas ajudam a entender se o cluster está superdimensionado ou, em alguns casos, se há espaço para uso de recursos como autoscaling no Databricks.

Executores

Os executores usam uma determinada quantidade de recursos para, de fato, processar os dados conforme requisitado pelo processo driver. É o processo de maior interesse para quem busca oportunidades de otimização de uma aplicação Spark.

Quando uma aplicação Spark é iniciada, o processo driver a divide visando otimizar o processamento em paralelo:

  • Trabalho (“job”): uma sequência de etapas criada toda a vez que uma ação é executada.
  • Etapa (“stage”): uma sequência de transformações estreitas (“narrow transformations”) executadas em um job. Uma etapa é delimitada por transformações amplas (“wide transformations”).
  • Tarefa (“task”): é a menor unidade de trabalho que pode ser agendada para realizar o processamento dos dados. Uma tarefa age sobre um subconjunto dos dados que chamamos de partição. Um conjunto de tarefas compõe uma transformação.

Na imagem abaixo, ilustro uma aplicação Spark que possui duas ações e dois jobs:

Composição de uma aplicação Apache Spark.

Na imagem acima, no trabalho (1), etapa (1) e transformação estreita (1), as tarefas (1.1) e (1.2) podem ser executadas ao mesmo tempo, paralelamente. De forma similar, no trabalho (2), etapa (2) e transformação estreita (2), as tarefas (2.1) a (2.4) são executadas ao mesmo tempo. Os dois trabalhos podem ser executados paralelamente caso haja disponibilidade de recursos. O que não acontece é, por exemplo, as tarefas (1.1) de uma mesma etapa e um mesmo job serem executadas ao mesmo tempo.

Para extrair o máximo dos recursos do cluster, precisamos pensar na quantidade de “tasks” geradas, no tamanho das partições de cada “task” e em como organizar essas mesmas partições em uma etapa da aplicação para que a próxima etapa acesse-as da memória RAM com facilidade.

Em geral, isso se resume a organizar os dados de tal maneira que eles sejam agrupados em nós específicos ou, quando há alguma coluna que dê a noção de ordem para os valores, cada nó armazene uma específica faixa de valores. Essas técnicas de particionamento, por valor e por faixa, respectivamente, tem o objetivo de minimizar a necessidade de se realizar recombinações dos dados em diferentes partições existindo em diferentes nós, o que leva a transmissão de dados pela rede interna do cluster. Esse processo de recombinar os dados entre diferentes nós chamamos de “shuffle”, e é uma coisa que gostaríamos de evitar sempre que possível.

Outra forma de se desperdiçar recursos quando negligenciamos os detalhes de operação de um cluster é a criação de partições com muito mais dados do que outras, tornando a execução de umas tarefas mais demorada. Também é possível criar partições vazias, desperdiçando tempo de processamento para consultar arquivos que nada acrescentam ao resultado.

Felizmente, é possível ajustar alguns parâmetros que controlam a quantidade de memória RAM e CPU para um executor, bem como o tamanho máximo e quantidade de partições para uma determinada etapa. Alguns desses parâmetros, bem como a relação dos mesmos com a memória do executor estão ilustrados a seguir.

Memória de um executor Spark.

Por hora, não vou discutir a respeito dos parâmetros acima e do gerenciamento de memória de um executor Spark. Em publicações futuras, trarei exemplos de como monitorar e modificar esses parâmetros a fim de extrair o máximo dos recursos computacionais disponíveis usados por uma aplicação Spark.


Ações, transformações e avaliação preguiçosa

Usei bastante os termos “ação”, “transformações amplas” e “transformações estreitas” até aqui sem entrar muito em detalhes dos seus respectivos significados. Agora é o momento de falar a respeito.

Existem apenas duas grandes famílias de operações no Spark: as transformações e as ações. Transformações são operações que agem sobre um conjunto de dados e geram um novo conjunto de dados, mas preservando o primeiro conjunto. Este detalhe é muito importante: em Spark, os conjuntos de dados são imutáveis.

Ações, por sua vez, são operações que disparam a execução de uma série de transformações em vários conjuntos de dados e retornam um dado para o processo driver. No Spark, as transformações são executadas apenas quando demandado uma por ação. Esse é o conceito de avaliação preguiçosa (“lazy evaluation”).

Para tentar ilustrar melhor a mensagem que quero passar, farei uso de pseudo-códigos e alguns diagramas para descrever uma situação hipotética.

Imagine que temos um conjunto de arquivos losango, círculo e quadrado. Ao final, queremos um conjunto de dados onde temos apenas losangos e quadrados (Conjunto de dados 2). Como parâmetro inicial, temos o caminho para os arquivos (caminho para o Conjunto de dados 1).

Alguém familiarizado a linguagens orientadas a objeto escreve o seguinte par de linhas em uma aplicação Spark:

conjunto_de_dados_1 = ler_arquivos(caminho_para_o_arquivo)
conjunto_de_dados_2 = remover_circulos(conjunto_de_dados_1)

A pessoa familiarizada a linguagens orientadas a objeto imagina que o seguinte aconteceu:

  • A função ler_arquivos usou o parâmetro caminho_para_o_arquivo e gerou o objeto conjunto_de_dados_1.
  • Em uma segunda etapa, passamos o objeto conjunto_de_dados_1 para a função remover_circulos e geramos o objeto conjunto_de_dados_2onjunto_de_dados_2`

Ilustrando o raciocínio quando se desconsidera a avaliaçao preguiçosa

Contudo, o código possui apenas transformações, representados pelas funções ler_arquivos e remover_circulos, não existindo nenhuma ação de fato. Sendo assim, nenhuma transformação de fato aconteceu e nenhuma partição ou outro objeto foi gerado.

Para gerar o conjunto de dados 2, devemos ter uma ação em algum momento, como no seguinte exemplo:

conjunto_de_dados_1 = ler_arquivos(caminho_para_o_arquivo)
conjunto_de_dados_2 = remover_circulos(conjunto_de_dados_1)
conjunto_de_dados_2.collect()

Neste cenário, ocorre o seguinte:

  • Um parâmetro caminho_para_o_arquivo é passado para a função ler_arquivos. Em algum momento, o Spark gera as instruções de baixo nível para que o executor acesse esses arquivos em disco e crie o objeto conjunto_de_dados_1 em memória.
  • Um parâmetro conjunto_de_dados_1 é passado para a função remover_circulos. Em algum momento, o Spark gera as instruções de baixo nível para que o executor acesse os dados armazenados em memória do objeto conjunto_de_dados_1, remova os círculos, e crie o objeto conjunto_de_dados_2 em memória.
  • O método collect do objeto conjunto_de_dados_2, que é uma ação, é executado. Ele solicita a execução do melhor plano de processamento onde todas as transformações são executadas, gerando o conjunto_de_dados_2 para coleta.

Ilustrando o raciocínio quando se considera a avaliaçao preguiçosa.

Transformações estreitas e transformações amplas

Do conceito geral do que é uma transformação, podemos apresentar os conceitos de transformações estreitas (“narrow transformations”) e transformações amplas (“wide transformations”). A diferença entre ambas se dá na correspondência do número de partições de entrada para o número de partições produzidas e se há combinações entre as partições de entrada para produzir as partições de saída. Vale ressaltar que os dados das partições são imutáveis, portanto as transformações não as alteram. O que as transformações fazem é consumir partições para produzir outras.

Uma transformação estreita sempre usa uma partição para produzir uma única nova partição. Se em uma etapa da aplicação há 100 partições que serão consumidas por uma transformação estreita, então 100 partições serão produzidas por essa transformação. Em uma transformação estreita, o uso de uma partição não irá produzir duas partições, nem duas partições serão compactadas em uma só depois da sua execução. Nesse tipo de transformação, todas as etapas de processamento de uma partição em particular ocorre em um mesmo nó, não havendo transferência de dados entre nós para recombinar dados que estão espalhados pelo cluster, e gerar novas partições.

Transformações estreitas fazem uso de uma operação conhecida como “pipelining”, onde as instruções são escritas em uma fila na memória do nó e executadas uma após a outra neste mesmo nó.

Na imagem abaixo, ilustramos a correspondência de 1:1 no número de partições. Cada partição é submetida a uma mesma transformação, produzindo uma nova partição. As etapas de transformação sobre a partição 1 ocorrem em um nó específico do cluster, que pode ser ou não o mesmo nó em que as etapas de processamento da partição 2 acontecem. A analogia para estas partições também é válida para a partição 3.

Transformação estreita.

Já em uma transformação ampla, não há essa correspondência 1:1. Nesse tipo de transformação, os números de partições na entrada e na saída da transformação podem ser distintos. Transformações envolvendo groupBy, por exemplo, podem consumir mais de uma partição e agregá-las em uma só. Transformações envolvendo join, por outro lado, podem partir de 2 partições e produzir 3. Ainda podemos ter um orderBy onde existem três partições e na saída da transformação, porém, com uma das partições de entrada contribuindo com uma parcela dos seus dados em todas as partições de saída.

Nestas transformações, onde há a necessidade de recombinar dados de diferentes partições, o Spark executa uma operação chamada “shuffle”. Nesta operação o Spark prepara os dados para transmití-los entre os processos que podem estar em diferentes nós do cluster. Durante essa preparação, o dado é compactado e escrito no disco do nó antes de ser transferido pela rede. Então, um cuidado que devemos ter ao fazer uso de transformações amplas é o de aplicá-las somente aos dados que são necessários para o conjunto final produzido pela ação, minimizando o volume de dados comprimido, escrito em disco e transferido pela rede. Isso pode ser conseguido, por exemplo, selecionando, logo no início da do código, apenas as colunas necessárias para a produção de uma tabela.

Na imagem abaixo, ilustramos o resultado de uma possível agregação, onde duas partições são combinadas em uma só nova partição. Isso é o que acontece em operações onde a cláusula groupBy é utilizada:

Transformação ampla: agregação.

Na imagem abaixo, ilustramos uma situação em que dados existentes em 2 partições são recombinados e geram 3 partições. Isso é algo que pode acontecer em alguma operação envolvendo join.

Transformação ampla: join.

Na imagem abaixo, ilustramos o resultado de um possível reagrupamento onde a quantidade de partições na entrada e na saída são iguais, porém, os dados dessas partições são recombinados entre si para a criação das novas partições. Esse tipo de situação pode acontecer em uma operação envolvendo orderBy.

Transformação ampla: orderby.


Conclusão

É importante ter clareza a respeito de como o Spark divide uma aplicação e como os componentes desta ferramenta interagem entre si. Tratamos dos componentes driver, gerenciador de recursos e executor. Para tratar de como uma aplicação é dividida, e executada precisamos entender os conceitos de transformações, ações e avaliação preguiçosa.

Vimos que o processo driver é o componente do Spark que realiza a interface entre o código da aplicação, o gerenciador de recursos, e os executores. O driver interage com o código da aplicação e se comunica tanto com o gerenciador de recursos quanto com os executores, mas geralmente não é um componente que interage com os dados que o Spark processa. Devido a isso, é comum o processo driver ser alocado em um nó menos potente do que os nós que alocam o processo executor.

O gerenciador de recursos é o componente responsável por comunicar ao driver quais os recursos, como CPU e memória, disponíveis para a criação de processos executores. O gerenciador de recursos também é responsável por reservar os recursos solicitados pelo driver, e anunciar aos demais componentes a respeito do uso ou disponibilização destes recursos. Além disso, na ocorrência de processos executores falharem devido a um problema no nó, o gerenciador de recursos é responsável por escolher outro nó para fazer a reposição do recurso. Este componente pode ser por si só um cluster inteiro (Mesos, YARN ou Kubernetes) ou um processo criado pelo Spark, quando este é disponibilizado no modo standalone.

Os executores, por sua vez, são responsáveis pelo processamento dos dados. Devido a isto, geralmente, são alocados em máquinas mais potentes que o processo driver. Os executores recebem trabalhos do driver que, por sua vez, são divididos em etapas, onde as transformações geram um determinado número de tarefas, cada uma agindo em apenas uma partição.

Transformações ocorrem em processamento por avaliação preguiçosa, ou seja, são executadas apenas mediante uma ação. Uma transformação pode ser considerada estreita ou ampla.

Em uma transformação estreita, há uma relação de 1:1 entre as partições de entrada e de saída, além de não haver recombinação entre os dados destas partições. Este tipo de transformação faz uso de pipelining e ocorre em memória.

Já nas transformações amplas, não há uma relação exata entre o número de partições de entrada e de saída por haver uma recombinação dos dados. Isto existe em operações que envolvem groupBy, onde um número maior de partições é recombinado e reduzido a um número menor de partições. Em operações de join, por sua vez, um número menor de partições podem ser recombinados e gerar um número maior de partição. Por último, em operações envolvendo orderBy, os dados podem ser recombinados e o número de partições de entrada e de saída serem iguais.

O entendimento destes mecanismos do Spark pode contribuir para a produção de melhores aplicações ou buscar oportunidades de melhoria nas aplicações já existentes.


Referências