In the Spark Application Architecture post, we discussed Apache Spark architecture concepts. As we could see, tasks are the fundamental unity of work in Spark, and we are going to use it here to talk about Spark Executor and its memory.

In the section “Tasks and Partitions”, we are going to see the relation among tasks, partitions and the hardware. In the second section, “On-Heap and Off-Heap Memory”, we talk about the executor memory with a special focus on the On-Heap memory. In third part, “Reserved, Unified and User Memories”, we describe better the On-Heap memory and how it’s used. In the fourth, “Unified Memory: Storage and Execution”, we unveil some details about how this memory behaves accordingly to the size of objects being stored in it.


Tasks and Partitions

In the Spark Application Architecture post we presented how Spark use tasks to break down an application to process data in parallel. Physically, a task is a piece of memory and a thread that is kept ocuppied for a while to process a piece of data that we call partition. To simplify, we are going to consider that a CPU core has only one thread.

The partition size varies. Sometimes partitions are big and we have problems. They also can be too small and bring us other sort of problems.

It’s important to keep in mind that tasks and their occupy the physical components only at runtime. The memory slots and threads occupied by them are released as soon as data is processed, and the state of the processing is lost when applications fail due to any sort of problem, like networks problem. Because of Spark lazy-evaluated nature and the lack of state storage that we usually don’t use it as an ingestion engine.


On-Heap and Off-Heap Memories

Spark executor memory is splitted in two main regions, on-heap and off-heap.

The on-heap memory is managed by the JVM and is cleaned automaticaly by a garbage collection process that takes the responsibility from the developer to manage the device memory. When garbage collection occurs, all activities in the executor are stopped to delete objects that are not important to the application anymore. Then, when the cycle finishes, Spark continues to process data. Most of Spark applications use only the on-heap memory to store the objects they need. The size of this area is defined by the parameter spark.executor.memory and it’s a subset ot the worker node memory. We are going to focus on the on-heap memory next sections, but let’s dip our toes in off-heap memory first..

The off-heap memory is a space acessible by the Spark application running inside the JVM, but it’s not subject to garbage collection cicles. This means that the developer needs to manage which objects can be deleted in order to release memory. We usually choose to store data in off-heap memory when the time of garbage collection compromises the application performance, and most of the applications are not too sensitive to it..

Reserved, Unified and User Memory

The reserved memory a space of 300MB used by Spark to run itself. Despite being possible to change this value by setting the RESERVED_SYSTEM_MEMORY_BYTES in the Spark code, and then recompiling it, or even changing the testing parameter spark.testing.reservedMemory, we are going to consider it as a constant parameter.

The unified memory is the most important one to developers, since it’s in this region where partitions are alocated. This region unifies storage and execution memory, and it’s subject to an dynamic alocation mechanism that controls wether or not the execution memory could expand over the storage memory.

Finally, there is the user memory where some Spark metadata, user-defined structures and RDDs dependencies are stored. UDFs are an example of an object that uses this space. One more example is temporary tables that maps values from an RDD to another.

Having this in mind, we can add details to the previous image:

Memória on-heap detalhada

From the image above:

  • spark.executor.memory: defines the total executor on-heap memory. By default is 1GB.
  • spark.memory.fraction: defines the fraction of the executor on-heap memory occupied by the unified memory. Default is 0.6.
    • Unified Memory = spark.executor.memory * spark.memory.fraction
  • The Reserved Memory is 300MB.
  • The user memory is what remains.
    • User Memory = (1 - spark.memory.fraction) * spark.executor.memory - 300MB

Now, let’s see what lurks beneath the unified memory.

Unified Memory: Storage and Execution

This region “unifies” the storage memory and the execution memory, and is the most important region to the developer. That is because the storage memory is the region responsible propagating data structures accross the cluster such as cached DataFrames, variables, while the execution memory is used for computations in shuffle and wide transformations in general. Despite each of those regions having a well-defined purpose, one region can borrow some space from another up to a limit thanks to an dynamic allocation mecanism, which is available since Spark 1.6. By default, the Storage and Execution Memories occupy half of the unified memory space, and the Execution memory can’t occupy the Storage Memory half. However, it’s possible to tweak this by changing the parameter spark.memory.storageFraction to a value different than 0.5. It’s important to know that this parameter is the one defining the amount of Storage Memory that will not be occupied by the Execution Memory.

To ilustrate this mechanism, let’s suppose we have a DataFrame that is repeatedly used by multiple jobs in an application. Instead of reprocessing this DataFrame every time an action is called, the Developer decides to cache it. Then, Spark decides to use part of the execution memory as storage memory.

At some point of the application, the developer decides to remove the cache, releasing storage memory. Then, the application performs a heavy aggregation that requires a lot of memory. Then, Spark decides to use as much as it can of the unified memory as execution memory. The figure below illustrates this behavior when spark.memory.storageFraction is less than 0.5.

Now that the behavior of the unified memory is properly illustrated, we can add new details to the initial executor memory diagram:


Conclusion

Having a clear understanding about what a task is and how Spark Executor Memory is composed can help a developer to make better usage of the resources available. Tasks in its essence are basically a data partition processed by a thread, and data partitions are stored in different places accordingly to the nature of them.

In a broader view, Spark Executor memory is composed by On-Heap and Off-Heap regions, with the first one having automatic cycles to evict unused objects from the memory (garbage collection), and the second gives the responsibility to the developers to run such a process. Fortunately, most of the applications just need to use the On-Heap region.

The On-Heap region can also be described in terms of other regions (or subregions). They are the Unified Memory, User Memory, that stores user defined funcions, and Reserved Memory, which is needed by Spark to run itself. The unified memory is composed by an execution memory, reserved to store data related to wide transformations in general, and the storage memory stores cached DataFrames and broadcast variables. Storage memory can borrow space from Execution Memory, and Execution memory can borrow space from Storage Memory up to a limit defined by spark.memory.storageFraction.


References