In this post, I’d like to show some concepts for better understanding of Apache Spark applications. Most of the content here is available in many books, blog posts, paid courses and free YouTube videos. Here, I just compiled these materials and added some important details regarding my experience. This text is divided in three sections.
The first section, “Apache Spark Components Overview”, I present the basic Apache Spark components and their respective roles when executing an application, as well as the composition of an Apache Spark application. In the second section, “Actions, Transformations and Lazy Evaluation”, I discuss these three important concepts that are frequently mentioned in the first section, as well as in every text about Apache Spark. The third section is the Conclusion, where I wrap up the previous sections.
Apache Spark Components Overview
All Apache Spark applications use a Driver program, some Executors and a Resource Manager. The image below is shown In the official Apache Spark documentation
The image above represents the Apache Spark components and how they are related. Each component can be allocated in a server (node). The Driver program runs in the driver node, and the executors in the worker nodes. Tasks to process data run inside worker nodes. The Cluster (Resource) Manager communicates with the driver node to provide resources to executors. The Cluster Manager is a cluster by itself. At the time of this writing, Apache Mesos, Apache Hadoop YARN and Kubernetes can be used as Cluster Managers.
It’s also possible to deploy the Driver program outside the cluster. In these situations, we are deploying the Spark application in client mode.
We can redraw the image above into a simpler one, omitting the worker nodes details:
In the remainder of this section, we are going to focus on the following:
- Driver Program
- Resource Manager
- Executors
Driver
The Driver program has the following responsibilities:
- Request physical or virtual resources, like CPU and Memory, to the Cluster Manager.
- Keep track of application execution status.
- Interact with the user’s code.
- Analyze, schedule and orchestrate the applications across the executors.
Every interaction between the user’s code and the other resources is handled by the application SparkSession
. If a user wants to specify in their application the number of executors it needs, they just set a parameter for that in the SparkSession configuration.
Despite being the component that manages all application execution, the driver program doesn’t need too much resources to run. In fact, most of the applications run well in clusters where driver nodes are smaller than worker nodes.
Some applications need to execute commands, like the
.collect()
, at which data will be collected by the driver node from the executors. In these scenarios, the driver node may need more resources than the worker nodes.
Resource Manager
The Resource Manager has the following responsibilities:
- Provide physical or virtual resources, like CPU and Memory, as requested by the Driver.
- Reserve the requested resources for the application runtime.
- Release resources after an application is completed.
- Replace unhealthy nodes or unhealthy processes, such as worker nodes or executors.
The Resource Manager can be a Spark Process, running in Standalone mode, for testing purposes. In production scenarios, this component is a cluster by itself. At the time of this writing, Apache Mesos, Apache Hadoop YARN and Kubernetes clusters can be used as Resource Managers for Apache Spark
In my experience, Spark application developers don’t change settiongs in the Resource Manager, but they are expected to evaluate general metrics, such as CPU and Memory consumption. By evaluating these metrics, the developer can understand if the cluster is over or undersized, or even evaluate if platform-specific features, such as autoscaling in Databricks, can bring benefits to the application performance.
Executors
The executors are the components that processes the application workload as requested by the driver. This is the most important component to the ones looking for optimization opportunities.
When the Driver hands-off the processing to an executor, the application is split in the following components:
- Job: one or more stages followed by an action.
- Stage: one or more narrow transformations followed by a wide transformation or an action. Transformations are composed by tasks.
- Task: is the smallest unit of work the Driver can schedule in order to process data. A task always carries a partition (data component) and a core (computing component).
The image below illustrates these concepts:
In the image above, tasks 1.1 and 1.2 in the Job 1, Stage 1, can be executed in parallel. The same idea applies to tasks 2.1 to 2.4 in Stage 2, Job 2. Both jobs can run in parallel if there are enough resources available. What don’t occur is tasks in different stages in the same job be executed in parallel.
Having in mind these concepts help one to seek for optimization opportunities. For example, the number of tasks may be responsible for slowing down an application because Spark needs to access a high number of partitions spread across the cluster in a random order. On the opposite side, an application with a smaller number of tasks may have huge partitions that don’t fit in the Executor Memory. Now, let’s deal with the concepts of actions, transformations and lazy evaluation.
Actions, transformations and Lazy Evaluation
The words used in this section title are mentioned a lot in every text about Spark. The time to talk about them has arrived.
There are only two families of operations in Spark: transformations and actions. Transformations are operations that act over a set of data and create a new one. These sets are, as we saw before, the partitions in a task. Transformations don’t “transform” a set of data, they create a new one instead. Partitions are immutable, and this is something that I consider very important to highlight here. Transformations are not executed unless an action demands data generated by them.
In other words, actions trigger transformations in order to return data to the driver or to persist it in a storage. Transformations are only executed if they are related to an at some point in the code. This idea of processing data only when needed is called lazy evaluation.
I’d like to use some pseudocodes and diagrams to describe a situation that I often face when talking to some of my colleagues that are new to Spark.
Consider a dataset 1 at which we have diamonds, circles and squares. We want only diamonds and squares at the end of the process (dataset 2). As input, we have the path to the files that compose the dataset 1.
Someone unfamiliar with the concept of lazy evaluation may write the following:
dataset_1 = read_files(path_to_dataset_files)
dataset_2 = remove_circles(dataset_1)
However, the code has only the transformations read_files
and remove_circles
and no actions triggering them. Instructions were given to process the data, yet, no processing occurs since nobody requested the dataset with no circles.
To change this, an action to trigger the transformations must be included:
dataset_1 = read_files(path_to_dataset_files)
dataset_2 = remove_circles(dataset_1)
dataset_2.collect()
This is what happens: :
- A
path_to_dataset_files
parameter is given to a functionread_files
. At some moment in the future, Spark creates the low-level instructions, so the Executor will be able to read the files and create the partitions that composes thedataset_1
, which is stored in memory. - A
dataset_1
parameter is given to a functionremove_circles
. At some moment in the future, Spark will creates the low-level instructions, so the Executor will be able to read the files, create the partitions that composes thedataset_1
in memory, then create the partitions for thedataset_2
in memory, which are equal to the ones that composesdataset_1
, but without circles. - In the last line, the
collect
action triggers the previous transformations. Because of this request, Spark will generate an execution plan to create the best low-level instructions in order to retrieve the requested dataset (dataset_2
).
Narrow and wide transformations
From the idea of transformation, we can start talking about narrow and wide transformations. The difference between them depends mostly on the need of recombining data that exists in different executors to create new ones.
In a narrow transformation, partitions in an executor don’t need to be recombined with partitions in another executor to create the new partition. In this type of transformation, every operation to create a partition runs in the same node, avoiding data being transferred through the network. This is what happens under operations named as “pipelining”, at which instructions are written as queues in memory, and then executed one after another in the same node.
In the image below, three partitions from three different tasks has their data processed by the same transformation, which leads to three new partitions. Everything is occuring under the same executor and the same node.
In wide transformations, data from different partitions in different executors need to be recombined in to create new partitions. At these type of transformations, the number of partitions in the output may be different than the input counterpart. For example, in operations involving groupBy
, probably there will be more input partitions than output partitions. Operations that combines data from two different tables, like joins
, can have less input partitions than output partitions. It’s also possible to have the same number of input and output partitions if the type of recombination we need to execute is just sorting the data using groupBy
.
At this type of transformations, Spark executes a shuffle operation. This means that data is prepared to be transfered across executors, which involves compacting the data, persist them on disk and then transfer the data over the cluster network.
In the image below, an aggregation combines data from two partitions, leading to a new smaller one. This can occur when groupBy
clauses are utilized.
In the image below, data from two different partitions are recombined and create 3 new ones. This is something that may occur under a join
operation, and depending on how this operation is executed, we may end up with more data in the output than in the input.
In the image below, data spreaded across three partitions are sorted in a transformation, leading to three new partitions. This is something that may occur in operations using orderBy
.
Conclusion
It’s important to have in mind how Spark splits an application and how its components interact with each other. We have discussed about the Driver, Resource Manager and the Executor. In order to understand how an application is split and executed, we also needed to be clear about the concepts of transformations, actions and lazy evaluation.
We’ve seen the Driver as the Spark component responsible for acting as an interface between the user’s code, the Resource Manager, and the Executors. The Driver interacts with the user’s code in order to know which resources are needed to run an application, then requests the resources to the Resource Manager, and finally send the instructions to the Executors to run the application. The Driver also inform the Resource Manager that an application finished. The Driver doesn’t usually process data, so it’s not uncommon for Driver nodes to be less powerful than Worker Nodes
The Resource Manager is responsible for communicate to the Driver about the resources, like CPU and Memory, available to create Executors in the worker nodes. It also responsible to reserve and release resources as requested by a Driver. When executors or nodes fail, the Resource Manager is the component responsible for healing the system. In testing environments, this components is only a process running in a computer. In production environments, the Resource Manager is a cluster by itself. At the time of this writing, Apache Mesos, Apache Hadoop YARN and Kubernetes clusters can be used as Resource Managers.
Executors are responsible for processing the applications data. They often run in nodes that has more computing power than the Driver. Executors receive the workload,from the Driver. The workload is split into tasks, that make groups of transformations, which compose stages.
Transformations are operations that occurs via lazy evaluation. That means, they only happens only when an action requests data created by an associated transformation.
There are two types of transformations, the narrow and the wide one. In narrow transformations, data inside different partitions don’t need to be recombined in to create new partitions. It’s common in narrow transformations the number of input partitions be the same as the number of output partitions. In wide transformations, data spread across multiple partitions need to be recombined in to create new partitions. This may result in less or more partition data, but it’s not uncommon to have the same number as well.