If there’s a chance a DataFrame
contains duplicated rows, it’s a good idea to deduplicate it before loading into the table. Better to be safe than sorry, right? Absolutely. But sometimes using DISTINCT
clauses carelessly lead to serious performance issues. I think every data practitioner has made this mistake of adding DISTINCT
clauses to every query and DataFrame
to ensure no duplicated rows are sneaking in. Since I’ve seem a lot of people doing this, I figured it’s a good idea to walk through an example and explain why this isn’t the best solution.
A bad usage of a DISTINCT
clause
Let’s start with the code bellow in order to evaluate what happens when we use DISTINCT
clauses:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
APP_NAME = f"(Useless Distinct - AQE - 3500m - 4 cores) individual_incident_useless_distinct.py"
spark_conf = SparkConf().setAppName(APP_NAME)\
.set("spark.executor.memory", "3500m")\
.set("spark.dynamicAllocation.enabled", "true")\
.set("spark.dynamicAllocation.minExecutors", 1)\
.set("spark.dynamicAllocation.maxExecutors", 1)\
.set("spark.executor.cores", "4")\
.set("spark.sql.adaptive.enabled", "true")\
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
SOURCE = "/opt/spark/data/bronze/individual_incident_archive_distinct_non_partitioned_parquet"
def read_parquet(spark, source):
spark.sparkContext.setJobGroup(APP_NAME, "Reading parquet files." )
df = spark.read.parquet(source)
return df
def main():
spark = SparkSession.builder.config(conf = spark_conf).getOrCreate()
df_input = read_parquet(spark, SOURCE)
spark.sparkContext.setJobGroup(APP_NAME, "Emulating write after useless distinct.")
df_input.distinct().write.format("noop").mode("overwrite").save()
spark.sparkContext.setJobGroup(APP_NAME, "Emulating write without the useless distinct")
df_input.write.format("noop").mode("overwrite").save()
if __name__ == "__main__":
main()
In the snippet above, we read a table that has no duplicated rows. Next, a distinct()
transformation is applied to the Dataframe df_input
before simulating the writing process. After that, we simulate another writing process without the distinct()
operation over the same DataFrame
.
Analyzing the Spark Application Execution
After running the code above, we can access the execution log through the History Server:
As shown in the image above, the jobs 1 and 2 are related to the writing simulation of the DataFrame
where the distinct()
was applied, with a total runtime of 29 minutes. Job 3, which is related to the writing process of the DataFrame
without the distinct()
transformation took about 4 minutes only.
Analyzing the job with the useless DISTINCT
After opening the link to view the details for job 1, we see the following screen:
This job consists of single stage, which includes a transformation related to reading the Parquet Files (Scan Parquet), an optimization (WholeStageCodegen (1)), and a shuffle(Exchange). It’s noticeable that this stage processed 4,4GiB
and wrote 15.4GiB
across 43 partitions.
At the top of the Details for Job 1
screen, there’s a link to the associated query for this job (Associated SQL Query: 0), which take us to the Details for Query 0
screen. Here, we can see a graph with 7 nodes.
As shown in the image above, we can see that the first node corresponds to reading 200 Parquet files that make up the table. The total volume is 4.4 GiB
across 135,401,839
rows.
At the optimization stage that follows, there is a conversion from a RDD ColumnarBatch
to a RDD InternalRow
. Notice that the number of input and output rows is the same.
After that, we see the HashAggregate
operation related to the distinct()
transformation. In this step, data across partitions under the same worker node is deduplicated. Notice that there’s a Spill
of 105.4 GiB
, which is more than 25 times the data read in the first task.
In the next stage, we have a shuffle (Exchange
) and another HashAggregate
, as we can see in the image below:
As we could see, the shuffle operation moves 97.9 GiB
of data to just deduplicate it. After that, deduplication happens at the HasAggregate
step. Notice that we had a spill of 95.7 GiB
. Additionally, the number of output rows at this step is the same as the begining, indicating that there were no duplicated rows since the start of the process
The last two steps, AdaptativeSparkPlan
and OverwriteByExpression
, are related to the process of optimizing the number of shuffle partitions and writing operation, respectively.
Analyzing the job without the DISTINCT
After clicking on the job ID 3 details link from the initial job screen, we can see the following:
Notice that this is a single-stage job, containing a transformation related to the reading process and another related to the optimization. By clicking on the link to the associated query (Associated SQL Query: 1), we land on the following page:
Notice that the graph for this job is much simpler than the previous one were distinct()
was applied. There is a step related to the reading, another to convert data from RDD ColumnarBatch
to RDD InternalRow
and the one related to the writing. It’s also important to highlight that the number of output rows here is the same as the one that appears on the job with the distinct()
transformation. The result of both jobs are the same, but they achieve it in completely different ways.
What to do to avoid useless DISTINCT
?
In my experience, careless usage of distinct()
often stems from a lack of experience with data modeling, developing data pipelines, and understanding how Spark works.
I’ve heard countless times that “this distinct()
here is to ensure that there are no duplicated rows”, and each time, I noticed a degree of difficulty from my peers regarding concepts of data modeling like dimension tables, fact tables, primary keys, and so on. Even though platforms that use Spark as computing engine don’t enforce this approach, the concepts are still there and should be applied by the developer. I believe that a data practitioner with clear understanding of data modeling can ask better questions to the business team, get better answers, and transform these answers into better code.
Knowledge about data modeling also enable developers to implement data quality check mechanisms from scratch. In general, these checks can be implemented by developing a query that returns data violating the data quality rules. If the result of this query is not empty, then we have a problem. In these situations, it’s possible to either interrupt the loading process ultil the data quality rules are refined and then implemented or filter the rows that violated the rule, log this violation elsewhere, and load the data into the final table, and then discuss about the violation.
When it comes to the difficulty about understanding Spark, I don’t believe that the reasons matter. I just hope that my posts can somehow help the ones working with this tool to have a better life.
Conclusion
It’s important to think about the DISTINCT
usage on our code in order to avoid unnecessary operations. One simple way to verify if we really need this transformation is by checking the number of output rowsin Spark UI at the beginning and at the end of the job. If the number of output rows are the same, so we have an unnecessary usage of the DISTINCT
transformation.
In addition to that, it’s important that both technical and business teams to have enough knowledge about the data that is being processed, so that any unnecessary operations an costs can be avoided.