Na postagem anterior, abordei alguns conceitos relacionados à estrutura de uma aplicação Apache Spark, bem como introduzi a memória do processo executor. Nesta publicação, entrarei em detalhes a respeito deste recurso partindo ideia de tarefa, comum ao texto anterior.

Na primeira seção, “Tarefas e partições”, discorro sobre a relação entre tarefas e partições entre si e com o hardware, bem como levanto novas questões a considerar nesta e em outras publicações. A intenção desta primeira parte é deixar claro como uma tarefa no Spark é composta em termos de recursos computacionais físicos.

Na segunda, “Memória On-Heap e Off-Heap”, discuto uma primeira divisão da memória do executor com base no gerenciamento deste espaço. Nesta seção, trato brevemente sobre as vantagens e desvantagens de cada uma destas duas áreas e quando é recomendado usar uma em relação à outra.

Na terceira, “Memória reservada, memória unificada e memória de usuário”, entro em maiores detalhes da memória “on-heap”. Apresento os parâmetros necessários para dimensionar suas subdivisões, em especial a memória unificada, que veremos ser de maior importância à pessoa desenvolvedora da aplicação Spark.

Na quarta, “Memória unificada: armazenamento e execução”, escrevo sobre quais tipos de dados são armazenados em cada uma destas duas regiões, suas respectivas importâncias para Spark e, finalmente, sobre o mecanismo que permite a unificação destas memórias em uma só.

Na conclusão, retomo os pontos anteriores de maneira sucinta, mas sem deixar de evidenciar as lacunas deixadas pelo texto, que podem motivar estudos independentes por parte de quem o lê ou outras publicações daquele que vos escreve.


Tarefas e partições

Na publicação anterior, mencionei o conceito de [[tarefas]] ([[tasks]]) para apresentar como o Spark quebra uma aplicação a fim de maximizar o processamento em paralelo. Salvo em condições especiais, como onde um processador suporta o recurso de hyper-threading, assumimos que uma thread corresponde a um núcleo (core) do processador.

Tarefa em termos de hardware

No Spark, cada tarefa ocupa um núcleo por um período para processar um conjunto de dados, que é a partição. Tendo isto em mente, podemos nos perguntar o seguinte:

  • Onde exatamente ficam dados de uma partição durante o processamento?
  • O que podemos considerar como pequeno ou grande quando falamos do tamanho de uma partição no Spark?

As perguntas acima direcionam as próximas seções deste texto. Não se esqueça delas.

O que gostaria de enfatizar em relação às partições é a questão da sua volatilidade. Em geral, os dados não são persistidos e só existem durante o tempo necessário para que o núcleo processe-os, às vezes automaticamente reaproveitando-os em outra transformação. Também existem casos em que o desenvolvedor decide persistir o resultado do processamento em memória ou em disco para reaproveitá-los, o que será tema de outra postagem. Por agora, considere que os dados de uma partição são voláteis e só existem durante o momento da execução da aplicação.

Essa volatilidade dos dados, incluindo aqueles contendo a informação do estado de processamento de uma aplicação em execução, é o que torna o Spark uma ferramenta inadequada para processos de ingestão de dados.


Memória on-heap e off-heap

Na publicação anterior, apresentei a seguinte imagem para ilustrar a segmentação da memória de um executor Spark:

Memória de um executor Spark

Nela, podemos ver duas grandes áreas nomeadas “on-heap” e a “off-heap”. Vamos simplificar a imagem acima e reconstruir apenas o necessário conforme avançamos. Por hora, temos o seguinte:

Memória on-heap e mem[oria off-heap no executor do Spark]

A memória “on-heap” é uma área gerenciada pela JVM e está sujeita a um processo de limpeza automático chamado garbage collection . Este recurso, presente em linguagens que usam a JVM, permite à pessoa desenvolvedora se abster da responsabilidade de gerenciar a memória do dispositivo, o que envolve alocar objetos na memória ou definir quais objetos serão apagados. A maioria das aplicações Spark faz uso apenas da memória “on-heap”, definido pelo valor do parâmetro spark.executor.memory, e vamos focar nela nas próximas seções, mas sem deixar de escrever algumas linhas sobre a memória “off-heap”.

O que é usado no PySpark é, na verdade, uma API para se comunicar com o Spark, que é escrito em Scala.

A memória “off-heap” é um espaço acessível pela aplicação executada dentro da JVM, mas não é sujeita aos ciclos de garbage collection. Com isso, os objetos criados pela aplicação devem ser alocados ou removidos conforme comandado pela pessoa desenvolvedora. Então, qual é a vantagem de usar a memória “off-heap” em relação à memória “on-heap”?

Quando há a execução do processo de garbage collection, a JVM simplesmente para todos os processos. Se há uma tarefa em execução e a JVM percebe que precisa liberar espaço da memória “on-heap” para continuar, as tarefas são pausadas para que os objetos inúteis sejam removidos. Nos casos em que esse processo de garbage colletion afeta negativamente a aplicação, e técnicas de refinamento de garbage collection já foram tentadas, então a memória “off-heap” é utilizada.


Memória reservada, memória unificada e memória de usuário

A memória reservada é um espaço de 300MB utilizado pelo Spark para alocar seus objetos internos e garantir o seu funcionamento. Este valor pode ser mudado alterando a variável RESERVED_SYSTEM_MEMORY_BYTES no código do Spark e recompilando o framework, ou alterando o parâmetro spark.testing.reservedMemory. Como é altamente não recomendado alterar a memória reservada, vamos tratá-la aqui como imutável.

A memória unificada é a região mais importante para quem desenvolve a aplicação por ser nesta região onde os dados das partições são alocados. A região de memória unificada recebe este nome por ser dividida em duas partes, a memória de execução e a de armazenamento, que estão sob a gestão de um mecanismo de alocação dinâmica entre ambas conforme a necessidade da aplicação.

Por último, temos a memória de usuário, utilizada para armazenar alguns metadados do Spark, estruturas definidas pelo usuário e dados sobre dependência entre RDDs. Um exemplo de estrutura que faz uso desta memória são as UDFs e, em algumas situações, o Spark fará uso desta memória para criar uma tabela temporária apenas para mapear valores em um RDD para outro RDD em algumas operações de junção (joins).

Agora, podemos acrescentar alguns detalhes à imagem anterior:

Memória on-heap detalhada

E, a partir disto, podemos considerar o seguinte:

  • A quantidade total de memória “on-heap” de um executor é definida pelo parâmetro spark.executor.memory. Este valor, por padrão, é de 1GB.

  • O valor máximo da memória “on-heap” pode ser um valor tão alto que deixa apenas o mínimo necessário de memória para o sistema operacional do nó funcionar.

  • O valor mínimo da memória on-heap não pode ser menor do que 300MB, uma vez que este é o valor da memória reservada. Podemos esperar que o valor mínimo seja algo maior que 300MB e menor que 1GB. Mas, quanto?

  • A memória unificada é uma fração da memória do executor:

    • Memória unificada = spark.executor.memory* spark.memory.fraction
    • Por padrão, o valor do parâmetro spark.memory.fraction é de 0.6, nos levando a uma memória unificada de 600MB. Lembre-se que esse é o espaço em que as partições são alocadas.
  • A memória de usuário é definida pela seguinte expressão:

    • Memória de usuário = (1 - spark.memory.fraction) * spark.executor.memory - 300MB
    • Considerando os valores padrão, temos que Memória de usuário = 100MB.

Com esses valores, podemos ter uma ideia do que é considerado uma partição grande e uma partição pequena:

  • Para as configurações padrão do Spark, uma partição pequena tem o valor muito menor do que 600MB.
  • Para as configurações padrão do Spark, podemos considerar que uma partição grande é aquela que ocupa todo o espaço da memória unificada. Então, podemos esperar que o valor de 600MB corresponde a uma partição grande para uma tarefa.

E, com as observações acima, novas dúvidas surgem:

  • O quão abaixo de 600MB uma partição tem que ser para ser considerada muito pequena para uma tarefa?
    • O que acontece se houverem muitas partições pequenas?
    • Existe a possibilidade de mesclar muitas partições em um número menor de partições?
    • Existe a possibilidade de criar partições vazias?
  • O quão próximo de 600MB uma partição tem que ser para ser considerada grande demais para uma tarefa?
    • O que acontece se existir uma única partição consideravelmente maior do que o valor de memória unificado?
    • Existe a possibilidade de dividirmos uma partição grande em outras menores? Precisaremos fazer alguns experimentos usando Spark para responder essas perguntas, e é essa dívida que vou deixar para pagar na próxima postagem. Por hora, deixo estas perguntas aqui para abordar o tema da memória unificada.

Eu sou um pouco lento ocupado. Se não quiser esperar para fazer experimentos, você pode acessar este repositório e iniciar por conta própria.


Memória unificada: armazenamento e execução

A região mais importante para quem desenvolve uma aplicação Spark é a memória unificada. Essa região “unifica” as memórias de armazenamento e de execução graças a um recurso conhecido como mecanismo de ocupação dinâmica, disponível desde a versão 1.6 do framework. A distinção dessas memórias se faz necessária tanto pelo tipo de uso dos dados armazenados em cada região, quanto pela importância que o Spark dá a cada uma. Por padrão, memória de execução e memória de armazenamento ocupam metade da memória unificada.

A memória de execução é responsável por armazenar as partições das tarefas e outras estruturas de dados necessárias para executar as transformações. Esse espaço é ocupado apenas durante o momento do processamento e liberado logo após o término da execução da tarefa.

Já a memória de armazenamento é utilizada para persistir alguns tipos de variáveis (as variáveis broadcast) e, principalmente, para cache. Lembre-se que o Spark só realiza o processamento dos dados quando uma ação é executada, característica essa que chamamos de avaliação preguiçosa.

Isso significa que, se uma transformação é compartilhada entre várias ações, o Spark executará essa mesma transformação toda vez que uma ação iniciar. Para evitar essa redundância de processamento, os dados podem ser persistidos na memória de armazenamento, como cache.

Memória unificada

Outra diferença essencial entre as duas regiões é o grau de importância que o Spark assume para cada uma delas.

Existem situações onde a memória de execução pode demandar mais espaço do que ela já ocupa na memória unificada. Nestes casos, o que o Spark faz é consumir parte da memória de armazenamento como memória de execução, mesmo quando a região da memória em questão está ocupada. Isso permite, em situações extremas onde mais memória de execução é requerida pela aplicação, que dados de cache da memória de armazenamento sejam apagados para que o espaço seja usado pela memória de execução.

Memória unificada

Já no caso em que a aplicação requer mais memória de armazenamento, o Spark pode consumir parte da memória de execução para usar como memória de armazenamento desde que haja espaço disponível para tal. Sendo assim, a memória de armazenamento só estende a sua área sobre espaços vazios da memória de execução, nunca liberando áreas já ocupadas desta última para uso como memória de armazenamento.

Memória unificada

Agora que temos uma melhor ideia de como é composição da memória de um executor, podemos voltar com a imagem da publicação anterior, omitindo apenas parte da memória “off-heap”, que não foi abordada nesta publicação.

Memória on-heap de um executor Spark


Conclusão

Como vimos até aqui, o entendimento da composição da memória de um executor pode partir de uma melhor compreensão sobre o que é uma tarefa no Spark e sua relação com os recursos físicos. Salvo em condições especiais, uma tarefa Spark ocupa um core de CPU e um espaço de memória por um período. Assim que finalizada, uma tarefa libera os recursos computacionais para serem utilizados por uma nova tarefa. Os dados transformados por uma tarefa, bem como os dados gerados pelo Spark para executar uma aplicação, são armazenados na memória do executor.

O executor de uma aplicação Spark, que existe dentro de uma JVM, tem sua memória dividida em diferentes regiões. Ao avaliarmos a memória pela ótica do tipo de gerenciamento dessas regiões, podemos dividí-la em duas grandes regiões. A região “on-heap” tem seu gerenciamento realizado de forma automática graças ao recurso de garbage collection da JVM. A outra região, “off-heap”, acessível pela aplicação Spark, tem seu gerenciamento sob responsabilidade da pessoa desenvolvedora. Apesar de parecer trazer apenas desvantagens, a região de memória off-heap é usada em aplicações onde o processo de garbage collection da JVM se torna um ofensor ao desempenho da aplicação.

A área de memória “on-heap” \também é dividida em outras menores, sendo elas a reservada, a de usuário e a unificada. A memória reservada geralmente não é alterada, e apresenta o valor de 300MB. Essa região é utilizada pelo Spark executar a si. A memória do usuário é utilizada para armazenar estruturas criadas pelo usuário, como UDFs ou estruturas importantes para execução de junções (joins) entre tabelas. A memória unificada, por sua vez, é a que mais chama atenção da pessoa desenvolvedora.

A memória unificada recebe este nome devido a junção de duas áreas com finalidades distintas, mas que podem ser compartilhadas, a memória de armazenamento e a de execução. A memória de armazenamento é utilizada quando há o reuso de um conjunto de dados processados que justifique a sua persistência em memória para uso em etapas posteriores da aplicação. Já a memória de execução é utilizada para armazenar as partições e outras estruturas de dados necessárias para que uma aplicação Spark seja executada.

O mecanismo de ocupação dinâmica, que unifica a memória de armazenamento e de execução, é responsável por expandir área ocupada da memória unificada pela memória de execução ou de armazenamento. Esse mecanismo permite que, se necessário, parte da memória de armazenamento seja ocupada pela memória de execução, mesmo quando a area já está preenchida. Nesses casos, o mecanismo remove objetos da memória de armazenamento para uso como memória de execução. Contudo, quando há uma maior necessidade de memória de armazenamento, o mecanismo só permite a expansão deste tipo de memória sobre a de execução caso haja espaço disponível nesta última. Desta forma, o mecanismo não apaga dados importantes para a execução de uma aplicação Spark a fim de permitir a expansão da memória de armazenamento.

Apesar destas observações permitirem ao leitor ou leitora ter uma maior clareza a respeito de onde os dados ficam, seja de uma partição ou de uma estrutura necessária à execução do Spark, estão pendentes questões relacionadas ao tamanho e quantidade de partições em uma aplicação. Nesta publicação, ainda não ficam claros os efeitos causados por um número muito grande ou muito pequeno de partições, bem como os efeitos causados quando uma partição é muito maior ou muito menor do que o espaço disponível de memória de armazenamento ou de execução. Estes temas serão tratados numa publicação futura com uma abordagem mais experimental.


Referências