Having a clear job description in an Apache Spark application makes it easy to spot optimization opportunities. By using the setJobGroup
method properly, you can quickly link code issues with what shows up in the Spark UI . In this short post, I’ll show how to do just that.
The problem
Let’s suppose we are running a benchmark for a simple application that only reads and sorts data. We want to evaluate its performance by varying the number of partitions. The initial code is shown below.
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
APP_NAME = f"(No Instrumentation - No AQE - 450m - 0.60) individual_incident_no_aqe_no_instrumentation.py"
spark_conf = SparkConf().setAppName(APP_NAME)\
.set("spark.executor.memory", "450m")\
.set("spark.memory.fraction", 0.60)\
.set("spark.sql.shuffle.partitions", 200)\
.set("spark.dynamicAllocation.enabled", "true")\
.set("spark.dynamicAllocation.minExecutors", 1)\
.set("spark.dynamicAllocation.maxExecutors", 1)\
.set("spark.executor.cores", "1")\
.set("spark.sql.adaptive.enabled", "false")\
.set("spark.sql.adaptive.coalescePartitions.enabled", "false")
SOURCE = "/opt/spark/data/landing/individual_incident_archive_csv"
def read_without_schema_inference(spark, source):
df = spark.read.option("header", "true").option("inferSchema", "false").csv(source)
return df
def main():
spark = SparkSession.builder.config(conf = spark_conf).getOrCreate()
df_input = read_without_schema_inference(spark, SOURCE)
df_input.sort("ID", ascending = False).write.format("noop").mode("overwrite").save()
spark.conf.set("spark.sql.shuffle.partitions", 150)
df_input.sort("ID", ascending = False).write.format("noop").mode("overwrite").save()
spark.conf.set("spark.sql.shuffle.partitions", 5)
df_input.sort("ID", ascending = False).write.format("noop").mode("overwrite").save()
if __name__ == "__main__":
main()
After executing the code above, we can see the following screen in the Spark UI:
Notice that we have only generic, automatically generated descriptions for each application job. We can infer that the job IDs 2 and 4 are related to the writing process using 200 and 150 partitions, respectively. It’s not so easy to connect the code with what you see in the Spark UI for this simple application = so imagine how difficult would be in a real-world scenario!
Fortunately, we can define the job description by using thesetJobGroup
method.
Adding descriptions and interpreting the Spark UI
It’s recommended to include both the Application Name and a job description when calling the setJobGroup
method. It’s also best to place this method invocation just before an action call. With that in mind, the previous code will look like this:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
APP_NAME = f"(With Instrumentation - No AQE - 450m - 0.60) individual_incident_no_aqe_with_instrumentation.py"
spark_conf = SparkConf().setAppName(APP_NAME)\
.set("spark.executor.memory", "450m")\
.set("spark.memory.fraction", 0.60)\
.set("spark.sql.shuffle.partitions", 200)\
.set("spark.dynamicAllocation.enabled", "true")\
.set("spark.dynamicAllocation.minExecutors", 1)\
.set("spark.dynamicAllocation.maxExecutors", 1)\
.set("spark.executor.cores", "1")\
.set("spark.sql.adaptive.enabled", "false")\
.set("spark.sql.adaptive.coalescePartitions.enabled", "false")
SOURCE = "/opt/spark/data/landing/individual_incident_archive_csv"
def read_without_schema_inference(spark, source):
spark.sparkContext.setJobGroup(APP_NAME, "Reading without schema inference." )
df = spark.read.option("header", "true").option("inferSchema", "false").csv(source)
return df
def main():
spark = SparkSession.builder.config(conf = spark_conf).getOrCreate()
df_input = read_without_schema_inference(spark, SOURCE)
spark.sparkContext.setJobGroup(APP_NAME, "Sorting and writing dataframe - 200 Partitions (default).")
df_input.sort("ID", ascending = False).write.format("noop").mode("overwrite").save()
spark.sparkContext.setJobGroup(APP_NAME, "Sorting and writing dataframe - 150 Partitions.")
spark.conf.set("spark.sql.shuffle.partitions", 150)
df_input.sort("ID", ascending = False).write.format("noop").mode("overwrite").save()
spark.sparkContext.setJobGroup(APP_NAME, "Sorting and writing dataframe - 5 Partitions.")
spark.conf.set("spark.sql.shuffle.partitions", 5)
df_input.sort("ID", ascending = False).write.format("noop").mode("overwrite").save()
if __name__ == "__main__":
main()
After these changes, we are going to see the following screen in the Spark UI:
Notice that there’s a job related to file reading. This occurs because no schema was provided to Spark. In such situations, Spark reads all CSV files to infer the proper data types within it or assumes all data types as String. By doing so, Spark triggers an action, Is this inference process that causes the action, not the reading itself.
With the changes applied, we canunderstand what’s happening under the hood and identify optimization opportunities. As shown in the image above, the fastest job is the one with 200 partitions, while the slowest job uses 5 partitions.
By clicking on the link for job 2, we can view its details and get some insights about the performancet. The following screen is appears:
As we can see in the image above, the Job 2 is composed by 2 stages. The first one, with ID 2, received 1066.7MiB
of data and wroten 566 MiB
of data accross 9 partitions. Stage 3 then receives this 566 MiB
from the stage 2, divinding it accross 200 partitions before successfuly completing. By clicking on the Stage Details for the stage 3, we can see the following screen:
In the picture above, we can notice that the values for the Shuffle Read Size/Records
slightly varies. This could be an optimization opportunity, but let’s go ahead with the job 6, since is the slowest one. and try to identify the reasons behind this difference.
The job 6 was splitted into two stages, 8 and 9. It’s also noteworthy to highlight that the volume in MiB
is the same of the stages from the Job 2. The only difference is the number of partitions, which is 5 for the Job 6. When clicking in the link to watch the details for the stage 9, we have the screen below:
At the Summary Metrics for 5 Completed Tasks
, we noticed that the size of the partitions varies considerably, where the smaller partition has 99,1MiB
of volume and the biggest122,2MiB
. This indicates that data is not evenly distributed accross partitions.
However, the main issue shown in the image above displays is related to Spill. This occurs when there isn’t enough space in the unified memory to handle all partitions required by a transformation. When this happens, Spark compresses the data in memory and temporarily stores it on disk (serialization). When needed, Spark retrieves data from disk, decompress it, and process it back in memory. The total volume of serialized data data that was serialized is presented as Spill (Memory)
, while its compressed and stored volume is labeled as Spill (Disk)
. This extra process of compressing-writing-decompressing-reading is what makes Job 6 slower than the Job 2.
Conclusion
Is noticeable that Spark, by default, creates jobs descriptions that are not so intuitive to link back to the code. This may delay the process of identifying optimization opportunities in the Spark UI. Fortunately, as we’ve seen in this post, we can use the setJobGroup
in order to define custom descriptions right before the calling of actions, making it easier to spot these opportunities.