Se existe a menor desconfiança da possibilidade de um DataFrame
apresentar linhas duplicadas, o melhor é garantir que elas serão removidas antes de carregar os dados no data lake do que arriscar, certo? Certo. O problema é que muitas vezes essa “garantia” vem com um custo, usando a cláusula DISTINCT
em todos possíveis DataFrames
, mesmo quando não precisamos dele. Acredito que o uso indiscriminado dessa cláusula é um evento canônico na vida de todo profissional de dados, e que vale a pena ser esclarecido para que novos profissionais não cometam esse equívoco de forma inconsciente.
Exemplo de um DISTINCT
desnecessário
Para ilustrar os efeitos deste problema, trago o seguinte código:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
APP_NAME = f"(Useless Distinct - AQE - 3500m - 4 cores) individual_incident_useless_distinct.py"
spark_conf = SparkConf().setAppName(APP_NAME)\
.set("spark.executor.memory", "3500m")\
.set("spark.dynamicAllocation.enabled", "true")\
.set("spark.dynamicAllocation.minExecutors", 1)\
.set("spark.dynamicAllocation.maxExecutors", 1)\
.set("spark.executor.cores", "4")\
.set("spark.sql.adaptive.enabled", "true")\
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
SOURCE = "/opt/spark/data/bronze/individual_incident_archive_distinct_non_partitioned_parquet"
def read_parquet(spark, source):
spark.sparkContext.setJobGroup(APP_NAME, "Reading parquet files." )
df = spark.read.parquet(source)
return df
def main():
spark = SparkSession.builder.config(conf = spark_conf).getOrCreate()
df_input = read_parquet(spark, SOURCE)
spark.sparkContext.setJobGroup(APP_NAME, "Emulating write after useless distinct.")
df_input.distinct().write.format("noop").mode("overwrite").save()
spark.sparkContext.setJobGroup(APP_NAME, "Emulating write without the useless distinct")
df_input.write.format("noop").mode("overwrite").save()
if __name__ == "__main__":
main()
No código acima, realizamos a leitura de uma tabela sem dados duplicados. Em seguida, aplicamos a transformação distinct()
no DataFrame df_input
e simulamos a sua escrita. Depois, realizamos outra simulação de escrita, mas sem aplicar a transformação distinct()
no DataFrame df_input
.
Análise da execução da aplicação
Após a execução da aplicação, podemos acessar pelo History Server os logs dos jobs executados:
Conforme observado na imagem acima, os jobs relacionados à simulação de escrita do DataFrame
com a transformação distinct()
somaram 29 minutos, contra 4 minutos da simulação de escrita do DataFrame
sem a transformação.
Analisando o job com o DISTINCT
desnecessário
Ao clicar no link em azul do Job ID 1, vemos a seguinte tela:
Este job consiste em apenas uma etapa, que inclui uma transformação de leitura do parquet (Scan Parquet), uma de otimização (WholeStageCodegen (1)) e uma shuffle (Exchange). Podemos ver também que esta etapa processou 4,4GiB
de dados e escreveu 15.4GiB
, distribuídos em 43 partições.
No topo da página da tela apresentada, encontramos o link para a query associada ao job executado (Associated SQL Query: 0). Ao clicarmos nesse link, somos direcionados a um gráfico dividido em 7 partes, que descreveremos a seguir:
Na imagem acima, podemos ver que, no primeiro bloco, há a leitura de 200 arquivos Parquet que compõem a tabela, totalizando 4,4 GiB
e 135.401.839
linhas.
Na etapa de otimização do processo que antecede a operação de shuffle, há uma conversão dos dados colunares para orientados a linha. Perceba que o número de linhas de saída se mantém inalterado.
No terceiro bloco, temos a operação HashAggregate
relacionada à transformação distinct()
. Nessa operação, os dados das partições em um mesmo executor são deduplicados. Observe também a ocorrência de Spill
de 105,4 GiB
, o que corresponde a mais de 25 vezes o volume de dados lidos.
Nos próximos blocos do gráfico, teremos uma operação de shuffle (Exchange
) e uma nova operação HashAggregate
, conforme imagem abaixo:
Na imagem acima, observamos que a operação de shuffle movimenta 97,9GiB
de dados para realizar a deduplicação. Feita esta movimentação, ocorre a deduplicação de fato no bloco HashAggregate
. Perceba, novamente, há outro Spill
de 94,7 GiB
. Em adição a isso, perceba que o número de linhas de saída desta operação (number of output rows) é o mesmo apresentado na etapa de leitura, evidenciando que não existiam dados duplicados presentes desde o começo.
Os dois últimos blocos, AdaptativeSparkPlan
e OverwriteByExpression
, correspondem à otimização do número de partições e a operação de escrita, respectivamente.
Analisando o job sem o DISTINCT
Na tela inicial dos jobs, clicamos no link do job ID 3 e obtemos a seguinte tela:
Perceba que este job contém apenas um estágio, contendo as transformações de leitura dos arquivos Parquet e a otimização. Ao clicar no link para a query associada (Associated SQL Query: 1), temos a seguinte tela:
Observe que o grafo gerado é muito mais simples do que aquele do job que contém o uso desnecessário do distinct()
. Temos apenas as etapas de leitura dos arquivos, transformação dos dados de orientação colunar para linear e a escrita. Além disso, os números de linhas (number of output rows) são iguais aos apresentados no job em que usamos o distinct()
sem necessidade. Além disso, não há menção de shuffle (Exchange
) ou Spill
nas métricas apresentadas.
O que fazer para evitar uso de transformações distinct
desnecessárias?
Com base na minha experiência, o uso desnecessário de transformações distinct()
decorre de uma lacuna na experiência com modelagem de dados, desenvolvimento de pipelines de dados e como o Spark funciona.
Muitas das vezes em que me foi dito “o distinct()
é para garantir que não ocorra duplicidade dos dados”, percebia uma certa dificuldade do interlocutor sobre conceitos como tabelas dimensão, tabelas fato, chaves-primárias (sejam elas simples ou compostas), entre outros. Ainda que as plataformas que fazem uso do Spark como engine de processamento não reforcem esse tipo de abordagem, esses conceitos ainda existem e precisam ser utilizados pela pessoa desenvolvedora da aplicação. Acredito que um profissional de dados com bom entendimento de modelagem é capaz de realizar as perguntas certas para o time de negócios e converter as respostas em código.
O conhecimento em modelagem de dados também permitirá a pessoa desenvolvedora implementar métodos de verificação de qualidade dos dados, mesmo quando não há disponibilidade de ferramentas como o Delta Live Tables. Geralmente, a checagem de qualidade de dados pode ser realizada construindo uma query que retorne os resultados que violam as regras estabelecidas e, caso o resultado seja um conjunto não-vazio, temos um problema. Nessas situações, é possível optar por interromper o processo de carga dos dados até que ocorra o refinamento e a implementação da nova regra de negócio, ou filtrar e registrar o caso de violação, seguindo com o processo, mas sem deixar de discutir a ocorrência do problema.
Quanto à dificuldade em entender o que acontece sob o capô de uma aplicação Spark, não acredito que os motivos sejam relevantes. Espero apenas que os meus textos contribuam para tornar melhor a vida de quem usa essa ferramenta.
Conclusão
O uso de cláusulas DISTINCT
deve ser realizado de forma moderada a fim de evitar operações desnecessárias. Para verificar a utilidade dessa cláusula, basta olhar na Spark UI se os números de linhas de saída da etapa inicial é igual ao número de linhas de saída da etapa final. Caso esses números sejam iguais, podemos evitar o uso da cláusula DISTINCT
.
Além disso, é imprescindível que tanto o time de negócios quanto o time técnico tenham conhecimento sobre os dados que estão sendo processados, evitando assim operações e custos desnecessários.