In part-1 of this series we saw a brief overview of Apache Spark, Resilient Distributed Dataset (RDD) and Spark Ecosystem. In this article, we will have a closer look at Spark’s primary and fault-tolerant memory abstraction for in-memory cluster computing called the Resilient Distributed Dataset (i.e RDD).
One of the most popular parallel data processing paradigm – MapReduce and its variants have been highly successful in implementing large-scale data-intensive applications on commodity clusters. However, most of these systems are built around an acyclic data flow model that is not suitable to efficiently solve the complex and iterative machine learning and graph processing algorithms, as well as the interactive or ad-hoc queries. All of these complex algorithms need one thing in common that MapReduce lacks : efficient primitives for data sharing. In MapReduce, the data is shared across different jobs (or different stages of a single job) with the help of stable storage. As discussed in the previous article, MapReduce stores results on the disk, and thus, the reads and writes are very slow. Also, the existing storage abstraction interfaces uses the data replication or update log replication for fault-tolerance. This method is considerably costly if we are dealing with data-intensive applications.
Resilient Distributed Dataset (RDD)
To overcome the aforementioned limitations of MapReduce, Spark introduced RDD as a restricted form of distributed shared memory which is both, efficient as well as fault-tolerant. RDDs are the immutable and partitioned collection of elements distributed across the cluster. RDDs restricts the Spark’s programming interface as they can be only built by the coarse-grained deterministic transformations i.e operations that are applied on all elements of all partitions at once.
Spark internally records every transformation that was applied to build the RDD into a Direct Acyclic Graph (DAG) called lineage. It is then used to reconstruct only the part of RDD or data that is lost due to a failure. So when a node comes back after a failure, all transformations recorded in the DAG are applied to the data on recovered node to reconstruct the RDD. In this way instead of replicating data across the cluster, Spark records only the transformations that were used to build the RDDs and replays them back for efficient recovery.
Spark allows to construct RDDs in 4 ways:
- From a file in a shared file system, such as the Hadoop Distributed File System (HDFS).
- By “parallelizing” a collection (e.g., an array) in the driver program, which means dividing it into a number of slices that will be sent to multiple nodes in the cluster.
- By transforming an existing RDD.
- By changing the persistence of an existing RDD.
Operations on RDD
Transformations and Actions are the two primary operations performed on RDDs.
Transformations are the operations that transforms one RDD into another. They are lazily evaluated and returns a pointer to the transformed RDD (we will look at the lazy evaluation of RDDs in the later part of this article). Some of the most commonly used transformations are map(), reduce(), flatmap(), join() etc.
Spark Actions are the mechanism for causing Spark to apply the specified set of transformations to the source data and return value to the driver program once the computation is done on the dataset. Some of the most commonly used actions on RDDs are – collect(), count(), take() etc.
Refer the Spark Programming Guide for more transformations and actions.
We previously looked that Spark stores the transformations that were used to build the RDD into a DAG in order to recover from the failure. All of these transformations in Spark are lazy in a way that they do not compute their results right away. When transformations are applied, they are recorded in a DAG and their computations are postponed until any action is called which returns result to the driver program. So when an action is applied, Spark actually executes all transformations recorded after the last applied action to the currently applied action. This design enables spark to run more efficiently since computation is done only if it is going to be used in future. Also, we can realize that a dataset created through map will be used by reduce, and only the result of reduce operation will be returned to the driver program instead of the bigger dataset.
One of the key features of the Spark is its ability to speed-up the iterative operations through caching or persisting the reusable data in main memory. When you persist a RDD, each node stores any partitions of it that it computes in memory and reuses them in other actions on that dataset. RDDs before we call cache() are computed each time they are used. So if you plan to use a RDD more than once, then you should Spark to cache that RDD. You can use the cache() operation to keep the RDD in memory. However, if you cache too many RDDs and Spark runs out of memory, it will delete the least recently used (LRU) RDD first. Again, the deleted RDD will be automatically recreated when accessed again, and the most recently cached RDD can be used directly from the memory. Following statistics shows how caching can be useful. It also helps to understand the efficient recovery in case of failure.
As we can see the time for first iteration is comparatively high (closer to what hadoop/MapReduce takes since data might be read from the disk). But from second iteration onwards, time required per iteration came down sharply by approximately 50% due to caching. Consider the special case of failure in iteration 6 pointed out in the graph. In case of failure, data is recomputed so it takes a little longer than the usual.
You can mark an RDD to be persisted using the persist() or cache() methods on it. In addition, each persisted RDD can be stored using a different storage level, which allows to persist the dataset on disk, persist it in memory but as serialized Java objects, replicate it across nodes, or store it off-heap in Tachyon. The full set of storage levels is:
|MEMORY_ONLY (default level)||Store RDD as de-serialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they’re needed. This is the default level.|
|MEMORY_AND_DISK||Store RDD as de-serialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don’t fit on disk, and read them from there when they’re needed.|
|MEMORY_ONLY_SER||Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than de-serialized objects, especially when using a fast serialized, but more CPU-intensive to read.|
|MEMORY_ONLY_DISK_SER||Similar to MEMORY_ONLY_SER, but spill partitions that don’t fit in memory to disk instead of recomputing them on the fly each time they’re needed.|
|DISC_ONLY||Store the RDD partitions only on disk.|
|MEMORY_ONLY_2, MEMORY_AND_DISK_2 etc.||Same as the levels above, but replicate each partition on two cluster nodes.|
|OFF_HEAP (experimental)||Store RDD in serialized format in Tachyon|
Selecting the Storage Level
Selection of storage level depends on the scenario in which you are working. You should analyze and decide which scenario fits best for your application. Here are few thumb rules while choosing the storage levels.
1) If RDD fits completely with default storage level, choose the default option. This will take maximum advantage of the Spark design allowing operations on RDD to execute as fast as possible.
2) If not, then use MEMORY_ONLY_SER level which will store the serialized objects on the disk. Choose the fast serialization library which will make the serialization fast and space-efficient.
3) Don’t release data to the disk unless the operations on the dataset are expensive or they filter large amount of data.
4) Replicated storage levels are recommended if we need fast fault-recovery. All the levels are fault-tolerant but replicated levels allow you to continue your task instead of waiting for the re-computation of the lost partitions.
5) In environment with high amounts of memory or multiple running applications, experimental OFF_HEAP method is used.
Here are the recommended readings on storage levels on the Spark website.