Categories
Computing

Apache Spark data structures

Apache Spark is a computation engine for large scale data processing. Over the past few months a couple of new data structures have been available. In this post I am going to review each data structure trying to highlight their forces and weaknesses.

I also compares how to express a basic word count example using each data structure.

RDD

Spark started with only one data structure: the RDD which stands for Resilient Distributed Dataset. This data structure has interesting properties (as indicated by its name):

  • Resilient: A RDD is resilient and can be recomputed in case of failure
  • Distributed: A RDD can be partitioned and distributed over several nodes.
  • Immutable: A RDD can’t be modified. Instead we apply a transformation to generate a new RDD.
  • Lazy: A RDD represents a computation result but doesn’t trigger any computation. That means one can describe a whole chain of computation (or DAG – Directed Acyclic Graph) by transforming a RDD into another RDD.
  • Statically typed: A RDD has a type (e.g. RDD[String] or RDD[Person])

Working with RDD comes naturally for anyone familiar with the Scala collection API. The whole computation is triggered when an action is applied to a RDD. The action triggers all the operations required to compute the final RDD.

sc.textFile("README.md")
   .flatMap(_.split(" "))
   .filter(_ != "")
   .groupBy(_.toLowerCase)
   .mapValues(_.size)
   .foreach { case (w,c) => println(s"$w: $c") }

RDDs are the lowest-level data structures in Spark. RDDs and their transformations describe how to compute things (similar to what a map-reduct job does) but doesn’t show what it does directly (similar to a definition language like SQL).

As a result there is no optimisations performed on RDD operations.
Each operation is executed where it appears in the DAG (i.e. the DAG is not optimised).

sc.textFile("README.md")
   .flatMap(_.split(" "))
   .filter(_ != "")
   .groupBy(_.toLowerCase)
   .mapValues(_.size)
   .filter(_ == "docker")
   .foreach { case (w,c) => println(s"$w: $c") }

In this example it would be more efficient to apply the last filter operation before the groupBy in order to reduce the amount of data to be moved around.

Dataframe

Dataframe was the second addition to Spark. It is a distributed collection of data organised into named columns.

It comes with a DSL that ressembles SQL. As a result it describes better the intent of a computation.

sc.textFile("README.md").toDF("line")
   .explode("line","word")((line: String) => line.split(" "))
   .groupBy("word")
   .count
   .show

Data frames take advantage of Catalyst to optimise their query plan  (e.g the filter operations are pushed closest to the source as possible) and dataframes often achieve better performance than RDD thanks to optimisation.

Dataframes do not run directly on the Spark context but on the SQL context (although in this example we load data into a RDD that we convert into a dataframe).

The main drawbacks is the lack of type safety. Data frames have an associated schema representing the data. However the schema just holds the column names and not the column types. Therefore the user needs to cast the values to the expected type when accessing the content of a dataframe which lead to runtime errors.

Dataset

As we’ve seen so far dataframes are kind of nice because they provide a declarative API along with query plan optimisation however they lack the type safety of the RDDs.

Dataset aims at adding type safety to the dataframes.

Datasets run on the SQL context and provide a syntax similar to RDDs with lambda expressions.

sqlContext.read.text("README.md").as[String]
   .flatMap(_.split(" "))
   .filter(_ != "")
   .groupBy(_.toLowerCase)
   .count
   .show

Dataset provides the best of both world but wait … there’s more …

Tungsten

As dataset knows which data types it holds it can provides hints to generate encoders to save and operate on data in tungsten format.

Tungsten is the optimised memory engine used by Spark. It manages direct access to off heap memory in order to improve the performances even further.

Tungsten uses less memory than POJO or serialised java object and the generated encoders are faster than java serialisation (including Kryo) to transform domain objects into tungsten memory format.

Knowing the datatypes is especially useful here because it allows to  perform the operations directly inside tungsten minimising the encoding/decoding of data.

That means we get better performance and better memory usage as well. Cached data in tungsten format uses approximately 4 to 5 times less space.

Graphframe

The last data structure I’m going to cover is the graphframe. Graphframes are dedicated to graph storage and manipulation.

It stores a graph data into 2 distinct dataframes:

  • one dataframe to store the graph vertices
  • one dataframe to store the graph edges

In addition to the dataframe API it provides graph operations (e.g. Breadth first search, shortest path, page rank, …).

The word count example isn’t a good fit to illustrate graph frame usage so I invite you to read the databricks blog post introducing graphframes.

Conclusion

As you’ve guessed datasets are becoming a central piece in the Spark engine.

It is already available in Spark streaming and will be available for Spark MLib in upcoming release (probably Spark 2.0).

As of Spark 2.0 datasets should also be available in all supported languages (Scala, Java, Python).

This is a big win for Python because RDD were especially slow because of the conversion to native collections.