In the post Spark Executor and its memory, we explored tasks, their partitions, and how executor memory is divided into different regions, each with its own responsibilities. We also mentioned that problems may arise when partitions are too large. That is what we are going to discuss.

As we have seen, data partitions are stored in the executor unified memory, processed, and released when a task finishes. The size of these partitions varies, and sometimes they aren’t processed quickly enough to free up space for other partitions. Because of this, Spark compresses data from memory and stores it on disk until the application needs it. When there is enough space and the application requires those partitions, Spark reads, decompresses, and writes them back into memory for processing. This is what Spill actualy is.

With that in mind, we can suggest approaches to avoid it without making changes in the application main logic:

  1. Make partitions smaller, so that the hardware can process it in a viable time and no spill is required. This strategy doesn’t require changing the executor’s memory, but may slow down the application due to network transfers and metadata reading. This is a problem to be addressed in a future post.

  2. Increase the parameter spark.memory.fraction to a value bigger than 0.6. This appproach take memory from User Memory Space, increases the Unified Memory and doesn’t require creating larger executors. Therefore, costs don’t increase but this approach is limited spill values that are not critical.

  3. Increase the parameter spark.executor.memory to a value bigger than the one defined previously, or choosing worker nodes with more memory in Databricks. This strategy increases the on-heap memory, thus creating larger executors. As a result, costs are going to increase, but it solves critical spill values.

  4. Increase the overall unified memory in the cluster by adding more executors. This is the equivalent of adding more worker nodes iin Databricks. It increases costs and also it doesn’t work for all sort of applications, such as those involving reading non-splitabe file formats.

In practice, we often mix these approaches to achieve the most cost-effective execution. In the next sections, we’ll explore how to identify the spill problem and implement the strategies we’ve discussed.

Note: experiments are available at my personal GitHub Repository.

Spotting the Spill

We can see if spill is occurring by looking at different places in the Spark UI. The first place I usually go for is the SQL / Dataframe tab and then click on the query details link:

From the image above, we can see that spill occurred and it’s quite severe. Generally, partitions have the size between 1 and 200MiB, but the smallest file being spilled from memory is 408MiB.

Let’s start looking at the stage with the highest suffle read value, which is in our case is Stage 3.

Even though the partitions are not too large, the spill occurs. The reading time of each partition, however, seems high. In my experience, 128 MiB partitions take up to 8 seconds to process. Let’s take a look at Stage 2 details that created those partitions read by Stage 3.

As we can see, 128MiB partitions create the 70-ish MiB partitions that is read by Stage 3.

Now that we’ve spotted the spill, let’s apply the approaches mentioned before.

See the file individual_incident_force_spill.py for details.


Solution 1: decreasing partition size

The first approach is decreasing partition size. As we’ve seen in the previous section, 98.6GiB were spilled from memory to disk in Stage 3, which reads partitions with 70 MiB or thereabouts in size. The idea first here is to make partitions small by switching the parameter spark.sql.files.maxPartitionBytes from the default of 128MB to 8MB (8388608 bytes) because we are poor in hardware, so the smaller the partition, the faster it will be processed.

As we could see in Stage 2, shuffle write partitions are roughly half in size of the input partitions, so we are going to keep this proportion considering shuffle partitions with 4MB. That being said, we take the total shuffle write for this stage, 12,3GiB, convert it to MB, which gives 13207MB and divide it by 4MB. That’s it, 13207MB/4MB = 3301,75, which we round to 3350. This is the new value for spark.sql.shuffle.partitions.

After applying those changes and running the application, we get the following results in the Stages tab:

The total time across stages is slightly better. Let’s see the Stage 3 details:

From the figure above, notice that there isn’t any spill and partitions are being processed much faster than before - as expected, since partitions are smaller.

Now, let’s see if there is any spill in Stage 2:

From the image above, there is no spill and also partitions were processed much faster than its equivalent in the first run – which was expected, since data is also smaller. We can also check in Query Details that spill values are empty.

During next examples, we are going to show only the Stage details, since the query details are similar to the one we just saw.

See the file individual_incident_spill_solution_1.py for details.


Solution 2: increase number of partitions and unified memory

We are going to simulate a less critical spill by defining 16MB partitions. Let’s see the spill for Stage 3:

Spill in Stage 3 is not severe. We can see from the image above that up to the 75th percentile, there is no partitions causing spill, just some outliers.

Now, let’s see the spill in Stage 2.

We can notice that the spill in Stage 2 is more severe than the one in Stage 3, since spill occurred from the 25th percentile onwards. However, the volume in each percentile is not that much in comparison with the total memory unified available (approximately 400 MiB). We solve that by increasing the value of spark.memory.fraction from 0.6(default) to 0.8. After implementing this change and running the application, we got the following result in Stage 3.

Notice that Shuffle Read Size distribution is the same we presented in the example at the beginning of this section, but no spill occurred. Let’s see the Stage 2 details:

From the image above, we can notice that Input Size and Shuffle Write distributions are the same as in the example at the beginning of this section and no spill occured.

See the files individual_incident_spill_solution_2a.py and individual_incident_spill_solution_2b.py for details.


Solution 3 and 4: scaling out and scaling up, respectively

The most obvious solutions are 3 and 4, at which we add computing resources by either increasing the resources available per executor (scaling up) or by increasing the number of executors (scaling out). In Databricks, this would be the equivalent of choosing a more powerful worker node or increasing the number of worker nodes, respectively.

There are advantages and disadvantages in each approach. Paralelizable tasks get more benefits from aling out than scaling up executors. If an executor fails, all tasks executed by it can be lost. Consequentialy, less work is needed to recover those tasks if there are multiple, tinier, executors running multiple tasks at once. However, the driver will work more to coordinate and schedule tasks across the executor.

When it comes to scaling up executors, the main advantage is more tasks are going to run in the same executor, so there will be less work to the driver program to coordinate across the cluster. But this also brings the disadvantage of losing more tasks, and also needing more time to recover from a failure than using more executors.

Let’s see how to spot the effect of adding more executors versus increasing resources available per executor.

In the solution 3, the approach of increasing the resources per executor was implemented. In the Stage 3 details tab, we see the following:

In the image above, we can see in the left that we are using only one executor. Pay attention to the additional metrics highlighted: Scheduler Delay, Task Deserialization Time and Result Serialization Time.

In the solution 4, the approach of increasing the number of executors was implemented. In the Stage 3 details, we see the following:

Notice that we have 4 executors now. The differences are small and, since the workload is also small, we can conclude too much. Let’s understand the basics of the new metrics and compare them.

First, Scheduler Delay times represents the time a task wait to be scheduled for execution. This includes the time to send the task from the scheduler to an executor and the time to get the results from an executor back to the scheduler. Higher values in this metric can occur when the driver starves in resources, like CPU, Memory or Network Bandwith.

Second, Tasks Deserialization Time is the time spent by an executor deserializing a task that have been sent by the driver. Consequently, higher values could mean executors are starving in resources to deserialize the task and execute it. Depending on the situation, this could mean that partitions need to be smaller, or that a powerful executor is needed.

Finally, the Result Serialization Time is the time an executor spends serializing the task results before sending it to the driver. This metric is similar to Tasks Deserialization Time, but data is being sent in the opposite direction. Moreover, the consequences for higher times can be due to the same reasons we have higher Tasks Deserialization Time.

In conclusion, it’s important to consider the presented metrics and the workload context when interpreting them. As mentioned before, the workload in our tests were small comparing to real-world scenarios, but at least we could see where to look at when deciding to scaling up or scaling out executors in a cluster.

See the files individual_incident_spill_solution_3.py and individual_incident_spill_solution_4.py for details.


Conclusion

We saw that the main tool that help ups to monitor the spill ocurrence is the Spark UI. We can see spill ocurring when we have the metrics Spill (Memory) and Spill (Disk), where the first means data spilled from the memory to the disk before compression, and the second one correspond its size after compression when stored in disk. Learning about the distribution of those metrics help ups to evaluate the criticity of the spill. The higher the values of a distribution, the critical spill is.

It’s possible to leverage spill by decreasing partition size, tweaking executors unified memory, or by increasing the amount of resources available in a cluster. Decreasing partition size helps when we don’t want to invest in more physical resources However, depending on the workload, we may end up with other sort of problems regarding small files causing network overhead. Another possibility is to change spark.memory.fraction parameter to a value higher than 0.6, increasing the size of unified memory of an executor. This is a very useful approach when spill is not critical and we don’t want to increase physical resources in a cluster. Nevertheless, when adding resources is needed, we can choose between scaling up or scaling out executors. The decision on which approach is better depends on the workload, so it’s important to observe the behavior of metrics like Scheduler Delay Times, Task Deserialization Times and Result Serialization Times, which are additional metrics in the Spark UI that can help us to evaluate if a Driver or an executor needs more resource in order to process the workload in an optimal way.

Finally, I would like to thank you for reading this text. I hope it helps you in your Spark learning journey.


References