Sunday, November 9, 2014

Spark computations

In this post we'll start to build an understanding of how Spark computations are represented and executed, and why. Once again, I'm assuming familiarity with Scala, and the code for the examples can be found at https://github.com/spirom/LearningSpark -- in this case Ex2_Computations.scala.

Peeking under the lid of a simple computation

In the previous post we saw how to establish a SparkContext, and we'll continue from there by setting up an RDD of 10 number with 4 partitions, and applying two functions in series to its elements: the first one multiplying each element by 100 and the second adding 1 to each element.

val numbers = sc.parallelize(1 to 10, 4)
val bigger = numbers.map(n => n * 100)
val biggerStill = bigger.map(n => n + 1)

You might reasonably expect that each line of code would result in a new RDD being fully computed, but that's not the way it works. In order to allow the Spark system to execute computations efficiently (which mostly means with minimum communication) the system defers the computation until it knows what the goal is -- in other words, when it has to. And it doesn't have to until it reaches code that pulls data out of the RDD: to print it, to save it externally, or to compute some other data structure. Additionally, you can tell it to compute an RDD eagerly at some point, to cache it because you know you're going to use it a lot. So the three lines of code above achieve no Spark computation on RDDs whatsoever. Instead, a chain of dependencies is created for later use in execution. We can get Spark to show us this with an additional line of code:

println(biggerStill.toDebugString)

This produces a readable representation of what other RDDs a given RDD depends on, even showing which line of code is involved at each step. We'll refer to this as a "debug string" since that's what the Spark API calls it.  The (4) in the first line tells us that everything's happening across four partitions (since that's the way we set up the initial RDD). We'll look at partitions much more in later posts. Each RDD is shown with its ID which you can access using, say, biggerStill.id.

(4) MappedRDD[2] at map at Ex2_Computations.scala:21
 |  MappedRDD[1] at map at Ex2_Computations.scala:20
 |  ParallelCollectionRDD[0] at parallelize at Ex2_Computations.scala:19

But adding just one line of code changes this -- we'll use the reduce() method on RDD to sum the numbers:

val s = biggerStill.reduce(_ + _)

To execute this line the Spark system needs to run through the dependencies, apply any optimizations, and compute a final answer.

You can actually observe this quite nicely when running the sample project. Start by emptying out the log4j.properties file (or just change the log4j.rootCategory to INFO.) Then comment out everything below the definition of biggerStill and run the program. You'll see lots of log chatter before the debug string, but none after. When you uncomment the definition of s, you'll get more than a dozen lines of extra chatter as the value is actually computed.

A more complex example

You can make the dependencies more complex by defining another RDD in terms of two of the prior RDDs. There's an entire post comping up about operations on multiple RDDs, but we'll just use "++" or "union", which happens not to be a true set union as it retains duplicates.

val moreNumbers = bigger ++ biggerStill

At this point printing the debug string yields some insight, and, if you look carefully, a reminder of why you really want Spark to optimize computations -- you'd hate for it to compute 'bigger', alias MappedRDD[1], twice! Unfortunately this doesn't show the tree structure of the dependencies, so in the sample code on GitHub you'll see the definition and use of a Scala function called showDep that makes the transitive dependencies clearer.

(8) UnionRDD[3] at $plus$plus at Ex2_Computations.scala:34
 |  MappedRDD[1] at map at Ex2_Computations.scala:20
 |  ParallelCollectionRDD[0] at parallelize at Ex2_Computations.scala:19
 |  MappedRDD[2] at map at Ex2_Computations.scala:21
 |  MappedRDD[1] at map at Ex2_Computations.scala:20
 |  ParallelCollectionRDD[0] at parallelize at Ex2_Computations.scala:19

The effect of caching and checkpointing

As mentioned earlier, you can ask Spark to compute an RDD and cache its value if you believe you'll be using it a lot and you don't want to recompute it each time.


moreNumbers.cache()

Caching an RDD doesn't allow the dependence tree to be discarded, because the cache could be destroyed by a worker failure.

You can also specify that a particular RDD should be checkpointed, after specifying a checkpoint directory for the Spark context.

sc.setCheckpointDir("c:/temp/sparkcps")
moreNumbers.checkpoint()

But it's not enough to know we want it check-pointed: the values haven't been calculated so the Spark system doesn't have anything to write into the checkpoint file. It's only when Spark has a reason to compute the RDD that a checkpoint file can be written.

moreNumbers.count()

This forces the value of moreNumbers to be calculated, and so the value can be check-pointed. Let's look at the debug string now:

(8) UnionRDD[3] at $plus$plus at Ex2_Computations.scala:34
 |  CheckpointRDD[4] at count at Ex2_Computations.scala:49

Cached RDDs are not represented in the debug string, but checkpoint files are represented, because they replace the corresponding subtree of the dependencies: 'moreNumbers' won't need to be recomputed even if the computation is restarted because of a failure.

Error Handling

The "lazy" way Spark computes RDDs has an impact on error exception generation, and hence error handling, that's useful to understand. Consider this code snippet:

val thisWillBlowUp = numbers map {
  case (7) => { throw new Exception }
  case (n) => n
}

You'd expect it to throw an exception because 'numbers' contains 7. But it doesn't until 'thisWillBlowUp' is actually used in a computation. This next snippet will, indeed, get the exception to be thrown. This can feel pretty strange because, if you think of RDD definitions like normal data structure definitions, you expect to see exceptions that are thrown locally. So, in some ways, it helps to think of RDDs as functions rather than data structures some of the time (or, equivalently for those familiar with the finer points of functional programming, as lazy definitions rather than eager ones.)

try {
  println(thisWillBlowUp.count())
  println("wait -- it should have blown up")
} catch {
  case (e: Exception) => println("Yep, it blew up now")
}

What you've learned

Now let's take stock of why you need to know all this. There are actually several reasons:
  1. It helps you understand how spark achieves efficient computation, so you won't waste your time optimizing needlessly (save it for more advanced situations where Spark does need your help.)
  2. You'll be able to understand the logs better, and won't be surprised when things are "out of order". 
  3. Error handling will make more sense. 
I hesitated to take the lid off Spark mechanisms this early in a series of introductory posts, but you don't need to think about what I've told you here all the time when programming in Spark. It's something to put at the back of your mind and drag out as a tool when something is behaving strangely. When we continue this series we'll go back to using Spark for real work. Then, every once in a while, we'll peek underneath the mechanisms to help you stay out of trouble.

No comments:

Post a Comment