O objetivo da descrição de um job em uma aplicação Apache Spark é facilitar a localização de oportunidades de otimização. Utilizando o método setJobGroup corretamente, é possível agilizar a busca por etapas candidatas à melhoria na Spark UI e relacioná-las com o código da aplicação. Neste curto texto, apresento como fazer isso.


O problema

Suponha que gostaríamos de realizar o benchmark para uma aplicação simples envolvendo apenas a leitura e ordenação dos dados,onde desejamos avaliar o desempenho da sua execução em função do número de partições. O código inicial é apresentado a seguir:

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

APP_NAME = f"(No Instrumentation - No AQE - 450m - 0.60) individual_incident_no_aqe_no_instrumentation.py"

spark_conf = SparkConf().setAppName(APP_NAME)\
    .set("spark.executor.memory", "450m")\
    .set("spark.memory.fraction", 0.60)\
    .set("spark.sql.shuffle.partitions", 200)\
    .set("spark.dynamicAllocation.enabled", "true")\
    .set("spark.dynamicAllocation.minExecutors", 1)\
    .set("spark.dynamicAllocation.maxExecutors", 1)\
    .set("spark.executor.cores", "1")\
    .set("spark.sql.adaptive.enabled", "false")\
    .set("spark.sql.adaptive.coalescePartitions.enabled", "false")

SOURCE = "/opt/spark/data/landing/individual_incident_archive_csv"

def read_without_schema_inference(spark, source):
    df = spark.read.option("header", "true").option("inferSchema", "false").csv(source)
    return df

def main():
    spark = SparkSession.builder.config(conf = spark_conf).getOrCreate()

    df_input = read_without_schema_inference(spark, SOURCE)

    df_input.sort("ID", ascending = False).write.format("noop").mode("overwrite").save()

    spark.conf.set("spark.sql.shuffle.partitions", 150)
    df_input.sort("ID", ascending = False).write.format("noop").mode("overwrite").save()

    spark.conf.set("spark.sql.shuffle.partitions", 5)
    df_input.sort("ID", ascending = False).write.format("noop").mode("overwrite").save()

if __name__ == "__main__":
    main()

Ao observar a Spark UI para a execução do código acima, temos a seguinte tela:

Spark UI sem descrições de job

Nota-se que temos apenas descrições genéricas apresentadas para cada job da aplicação. Podemos inferir que os jobs ID 2 e 4 correspondem aos jobs disparados pelas ações de escrita com a quantidade de partições padrão (200) e de 150, respectivamente. Já não é fácil encontrar todas as correspondências em um código simples, então podemos imaginar que é ainda mais difícil quando a aplicação Spark é complexa.

Felizmente, é possível definir a descrição do job através do método setJobGroup.


Adicionando as descrições e interpretando a Spark UI

Para adicionar a descrição do job, recomenda-se passar o nome da aplicação e a descrição da ação para o método setJobGroupna linha imediatamente anterior à chamada da ação. O código da aplicação anterior, após as alterações necessárias, é apresentado abaixo:

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

APP_NAME = f"(With Instrumentation - No AQE - 450m - 0.60) individual_incident_no_aqe_with_instrumentation.py"

spark_conf = SparkConf().setAppName(APP_NAME)\
    .set("spark.executor.memory", "450m")\
    .set("spark.memory.fraction", 0.60)\
    .set("spark.sql.shuffle.partitions", 200)\
    .set("spark.dynamicAllocation.enabled", "true")\
    .set("spark.dynamicAllocation.minExecutors", 1)\
    .set("spark.dynamicAllocation.maxExecutors", 1)\
    .set("spark.executor.cores", "1")\
    .set("spark.sql.adaptive.enabled", "false")\
    .set("spark.sql.adaptive.coalescePartitions.enabled", "false")

SOURCE = "/opt/spark/data/landing/individual_incident_archive_csv"

def read_without_schema_inference(spark, source):
    spark.sparkContext.setJobGroup(APP_NAME, "Reading without schema inference." )
    df = spark.read.option("header", "true").option("inferSchema", "false").csv(source)

    return df

def main():
    spark = SparkSession.builder.config(conf = spark_conf).getOrCreate()

    df_input = read_without_schema_inference(spark, SOURCE)

    spark.sparkContext.setJobGroup(APP_NAME, "Sorting and writing dataframe - 200 Partitions (default).")
    df_input.sort("ID", ascending = False).write.format("noop").mode("overwrite").save()

    spark.sparkContext.setJobGroup(APP_NAME, "Sorting and writing dataframe - 150 Partitions.")
    spark.conf.set("spark.sql.shuffle.partitions", 150)
    df_input.sort("ID", ascending = False).write.format("noop").mode("overwrite").save()

    spark.sparkContext.setJobGroup(APP_NAME, "Sorting and writing dataframe - 5 Partitions.")
    spark.conf.set("spark.sql.shuffle.partitions", 5)
    df_input.sort("ID", ascending = False).write.format("noop").mode("overwrite").save()

if __name__ == "__main__":
    main()

Com o uso do método setJobGroup, a Spark UI exibirá a seguinte tela:

Spark UI com descrições de job

Note que há um job relacionado à leitura dos arquivos. Este job ocorre por que não foi definido um esquema de leitura para os arquivos acessados pelo Spark. Quando não fornecemos o esquema ao ler um arquivo CSV, o Spark pode inferir os tipos mais adequados para cada campo do arquivo depois de lê-los por completo, ou atribuir a todos os campos o tipo String. É esse processo de atribuição de tipos que corresponde a uma ação, não o processo de leitura em si.

Com a descrição do Job podemos facilmente localizar oportunidades de melhoria ou avaliar quais etapas obtiveram melhor desempenho, bem como localizar no código a ação que disparou o job em questão. Pela imagem acima, podemos identificar que o job com menor tempo de duração é o que possui 200 partições e os com maior tempo são aqueles com apenas 5 partições.

Ao clicar no link em azul do job 2, podemos verificar os seus detalhes e entender a razão desta diferença. Isso nos leva à seguinte tela:

Spark UI com detalhes do job 2

Nesta imagem, vemos que o Job ID 2 é composto por 2 etapas. A primeira, cujo ID é 2, recebeu um input de 1066.7MiB e escreveu 9 partições, totalizando o volume de 566 MiB. A segunda, de ID igual a 3, recebeu os dados da etapa anterior, dividiu o mesmo volume de dados em 200 partições e finalizou com sucesso. Ao clicar no link destacado em azul para a etapa 3 do job 2, podemos ver os detalhes da sua execução:

Spark UI com detalhes da etapa 3 do job 2

Na imagem acima, notamos que os valores para a métrica Shuffle Read Size/Records são ligeiramente diferentes. Isso significa que a distribuição dos dados entre as partições é quase uniforme. Apesar de existir a possibilidade de melhorar o desempenho do job 2, vamos seguir com a análise do job 6, que possui o maior tempo de execução e tentar identificar o motivo desta diferença.

Na página dos jobs da aplicação, clicamos no link destacado em azul para ver os detalhes da execução do job 6:

Spark UI com detalhes do job 6

Podemos notar que o Job 6 também foi dividido em 2 etapas, de IDs 8 e 9. Também notamos que os volumes em MiB são iguais em relação às etapas do Job 2, divergindo apenas no quesito do número de partições. No Job 2, tínhamos 200 partições, enquanto neste, como podemos ver a linha da etapa 9, temos apenas 5.

Ao clicar no link destacado em azul para a etapa 9, podemos ver os detalhes da sua execução:

Spark UI com detalhes da etapa 9 do job 6

Podemos verificar na imagem acima que as partições possuem uma variação de tamanho significativa, onde a menor possui 99,1MiB e a maior possui 122,2MiB, indicando que os dados não estão muito bem distribuídos.

Contudo, as métricas que chamam mais atenção estão relacionadas ao Spill. Este efeito ocorre quando não há espaço suficiente na memória unificada para alocar todas as partições requeridas pela transformação. Quando isto acontece, há um processo de serialização onde o Spark compacta os dados que seriam alocados em memória (Spill (memory)) e persiste-os em disco (Spill (disk)). Quando houver memória disponível para alocar as partições requeridas pela transformação, o Spark fará o caminho inverso, que chamamos de desserialização. Neste caso, o Spark descomprime o dado em disco e realoca-o em memória para prosseguir com a execução da aplicação. Este processo é o que torna o Job 6 muito mais devagar que o Job 2 avaliado anteriormente.

Por fim, vimos que em casos onde uma partição é muito maior que o espaço disponível na memória unificada do executor, o Spark faz uso do mecanismo de Spill para finalizar a aplicação com sucesso em sacrifício de desempenho. Desta forma, verificamos também que uma maneira de se evitar o Spill é escolhendo um número adequado de partições para o volume de dados que desejamos processar.

Aqui, forçamos o uso de um conjunto definido de número de partições para ilustrar o uso do método setJobGroup. Na prática, e para versões do Spark 3.0.0 e acima, há o mecanismo AQE que determina dinamicamente o número de partições adequado para uma determinada aplicação. Este mecanismo, definidos pelos parâmetros spark.sql.adaptive.enabled e spark.sql.adaptive.coalescePartitions.enabled , foi desabilitado propositalmente nos códigos de exemplo.


Conclusão

Podemos notar que o Spark, por padrão, cria descrições genéricas para os jobs de uma aplicação, o que torna difícil relacionar os diagramas gerados pela Spark UI com o código da aplicação. Por consequência, essa dificuldade também atrasa o processo de identificação de oportunidades de melhoria de código. Felizmente, como exemplificado anteriormente, podemos usar o método setJobGroup para definir a descrição de um job logo antes da execução de uma determinada ação.


Referências:

  • [Spark Documentation - pyspark.SparkContext.setJobGroup](setJobGroup