Sunday, November 16, 2014

Notes from the second day of PNWScala 2014

Here are some notes from the second and last day of the very successful PNWScala 2014 conference.


Adding Tree and Tree: Distributed Decision Tree Learning - Avi Bryant (Stripe)

This was about the Brushfire framework for learning decision trees, based on Hadoop via Scalding and Algebird. Generality, modularity and composability seem to have been very carefully thought through. The high level approach is based on the PLANET paper. The code will soon be available at http://github.com/stripe/brushfire.


What's new since Programming in Scala - Marconi Lanna (Originate)

A guided tour of language features since the last edition of the book. Some notable ones:
  • App trait (2.9)
  • Range foreach optimization (2.10)
  • Parallel collections (2.9)
  • Generalized try catch finally with reusable exception handling via PartialFunction
  • Try [almost] monad (2.10) 
  • Implicit classes (2.10)
  • Value classes (2.10)
  • Extension methods (2.10)
  • String interpolation (2.10) and custom interpolators
  • Futures and promises (2.10, 2.9.3)
  • Dynamic trait (2.10)
  • Akka actors
  • Modularization of advanced language features
  • Reflection, macros and quasiquotes

One Year of Akka - Ryan Tanner (Conspire)

A "view from the trenches", describing adoption of Scala and Akka at an early stage startup, and dealing with scaling issues. We were reminded that "Akka won't save you from building a monolith" and that it's easy to end up with a tightly coupled system. Additional advice included "pull, don't push", as described in the Akka work pulling pattern and more specifically in a post on the Conspire blog. The latter is the last of a series of five posts on this whole effort, and all five seem very much worth reading. Like some other members of the audience I was surprised to hear that Conspire was in the process of making a turn away from Akka clustering (but not Akka actors) and planning to introduce Kafka.


Hands-on Scala.js - Li Haoyi (DropBox)

Lots of live coding in this talk demonstrated that Scala.js appears to deliver on its promise of unifying server side and portable browser side programming in a single, strongly typed language with decent performance. The story gets even stronger when ScalaTags is included, providing an interface to DOM. Examples projects started very simple but got quite complex. There was also a project showing common code on the browser and server, and, for a grand finale, an example where communication between browser and server code was type checked. A very engaging presentation. 


Unruly Creatures: Strategies for dealing with Real Numbers - Erik Osheim (Typelevel)

Starting with a "primitive math blooper real", this talk motivated and explained the Spire library providing various advanced and well behaved representations of numbers. 


What every (Scala) programmer should know about category theory - Gabriel Claramunt

I've watched people try to give variants of this talk for three decades and it hasn't gotten any easier, especially in front of an audience with varied interests and backgrounds. It's quite a bit more compelling with Scala than it was with Standard ML. The bigger problem is that Scala has been successful to a large degree because it hasn't just been pitched to people who have learned or are willing to learn category theory. Most Scala programmers will never learn it and that's mostly a good thing. But knowing it does yield some insight into Scala, so this talk remains worth giving, and perhaps the Scala variant is more relevant than those in the past. Best line: "I came for the abstraction, stayed for the composition."

It may be worth checking out "Category Theory Applied to Functional Programming."

I'm still very interested in the question "what shared conceptual model do all Scala programmers need?", but my starting point is that it probably isn't category theory. It may be a dumbed down version, that explains what a Monad is (and isn't) and why it matters.  


Building a Better Future: Advanced Error Handling for Concurrent Programming with Scalaz and Shapeless -- Jean-RĂ©mi Desjardins and Eddie Carlson (Whitepages)

The last of several good discussions of error handling, this time in the context of futures. Almost anybody who has used futures a lot has at some point needed to collect multiple futures into a single one. Then they learned the hard way that Future.sequence doesn't do quite what they want, returning the first error in traversal order of the sequence, rather than temporal order, and thus not "failing fast" as is usually desired. See, for example, this discussion on StackOverflow. A lot of this solution was over my head as I haven't used wither scalaz or shapeless, but the key ingredients were scalaz.Applicative, shapeless.HList and HList sequencing features of shapeless-contrib. I'm hoping the slides get posted as this is an important problem. 


Composing Project Archetypes with SBT AutoPlugins - Mark Schaake (Allen Institute for Artificial Intelligence)

How to solve "Multiple Build Maintenance Hell" (MBNH) in an organization with lots of sbt projects. The solution described is essentially to define shared, versioned plugins based on the AutoPlugin concept introduced in sbt 0.13.5 -- described in this tutorial. Each plugin covers on facet of a project (a command line tool, a web service, ...) and plugins can depend on each-other using "requires". The specific plugins defined have been open sourced. The approach seems intuitively right, but somebody asked how a developer could be sure to avoid accidentally overriding plugin behavior. This seems like an interesting problem as sbt seems to be an area where developers often cudgel their code into submission without knowing what they're doing and the first thing that "works" tends to get checked in (and not looked at until something breaks.) 

Saturday, November 15, 2014

Notes from the first day of PNWScala 2014

Here are some of my impressions from the 1st day of PNWScala 2014. This is my first Scala conference and I'm delighted to see it well organized and well attended.

Rapture: The Art of the One-Liner -- Jon Pretty (Propensive) 

An introduction to the Rapture libraries for IO and related parsing tasks via a series of evocative and idiomatic "one liners". While no responsible Scala programmer would actually write them as one liners the point was well made: the JSON parsing code was very elegant. Jon's remarks on error handling led me to posit a litmus test for this kind of code being "industrial strength".

I want to be able to write similarly idiomatic and elegant code that allows me to process a large number of documents and:

  • Return representations of the valid documents (say in terms of case classes)
  • Return representations of the invalid documents that:
    • Identify the invalid document by some identifier and/or content
    • Explain what's invalid about it
  • For bonus points, while processing, give incremental counts of valid/invalid documents so I can decide that my failure rate is unacceptable (and something fundamental has gone wrong) or is acceptable and I can either discard or subsequently fix and reprocess the failure cases.
In a followup conversation Jon showed me how to do this with Try so I think it's plausible that Rapture actually meets my test.

The First Hit is Always Free: A Skeptic's Look at scalaz' "Gateway Drugs" -- Brendan McAdams (Netflix) 

This was an interesting view of scalaz "from the outside", doing a nice job of explaining why scalaz may be of interest to more than just functional programming researchers. It started with a tour of some of the usual Scala and scalaz approaches to error handling -- Option and Validation -- and why they're hard to work with (not Mondaic) . Then Brendan explained  scalaz's disjunction operator and showed how it makes it easier to accumulate error information while processing data. (The timing of this was interesting, coming on the heels of my error processing concerns about Rapture as described above.)

Types out of patmat -- Stephen Compall (McGraw Hill Financial)

I think everybody in the audience who hasn't worked among type theory researchers found this one really tough. I have worked with type theory researchers, but that was 22 years ago, which seems to be about 21 years too long for understanding this talk. My takeaway is that I should learn more about the subtleties of Scala pattern matching, and that I shouldn't expect it to work very well if the type of what I'm matching is complex enough to be interesting to a type theory researcher. I suspect this was a really interesting and informative talk for people who had the necessary background. The high-level warnings are useful to every Scala programmer.

Don't Cross the Streams -- Marc Millstone (Socrata)

This was focused on aspects of counting records in a stream, with adequate or at least understood accuracy, while dealing with bounded memory. The approaches are embodied in the Tallyho project. Marc talked about three kinds of approaches, based respectively on engineering, math and ignorance. There were a number of interesting techniques, some based on hashing, and all essentially unfamiliar to me. Stream processing seems to be an increasingly important application of Scala, especially with the success of Spark streaming. This talk also left me wanting to learn more about stream-lib, Algebird and Shapeless, and to check out the Highly Scalable Blog.

Apache Spark I: From Scala Collections to Fast Interactive Big Data with Spark -- Evan Chan (Socrata) 

A nice overview of Apache Spark from a Scala perspective, emphasizing the smooth transition from Scala serial to Scala parallel collections (Scala 2.10), and then to the RDD as a distributed collection, and Spark laziness as a natural extension of Scala lazy collections (Streams and Iterators). This material was mostly very familiar to me but Evan did a great job of emphasizing how natural it all was and of explaining it.

It was also interesting to hear that the entire Socrata backend is implemented in Scala.

Apache Spark II: Streaming Big Data Analytics with Team Apache, Scala & Akka -- Helena Edelson (Datastax)

At first this seemed like an overview of Spark streaming, with which I was already quite familiar, but it turned out to be more about building quite complex streaming-oriented Spark applications that also used Akka. There were a number of interesting points:

  • I had never heard of the Lambda Architecture, of which this approach is an instance
  • A LOT of audience members were using Apache Kafka
  • A significant but much smaller number were using Apache Cassandra
  • An apparently well known discussion on the apache-spark-user-list of why Spark is based on Scala was summarized as: funcitional programming, JVM leverage, function serializability, static typing and the REPL
  • The KillrWeather project is a reference application whose design is based on the ideas presented 
  • It's currently difficult to combine stream and historical data in a single application because SparkContext is not serializable  

Miniboxing: JVM Generics without the overhead  -- Vlad Ureche (EPFL) 

Miniboxing is essentially efficient specialization of generics for primitive types that fit in a long integer by actually fitting them in a long integer, with sometimes dramatic performance improvements and/or code size reductions.

It is available to use via a compiler plugin that is undergoing active development and improvement. A comment from the back of the room: "That is an insane amount of documentation for an in-development compiler plugin".

The value of this was illustrated using an image processing example.

Some background reading comes as a post called "Quirks of Scala specialization" by Alex Prokopec.

Towards a Safer Scala -- Leif Wickland (Oracle) 

This talk explored the various approaches to helping ensure that a project's Scala code is safe and correct:

My take-away was the the compiler parameters, Scalastyle and WartRemover were at the point where I should actually consider using them, and the remainder were worth watching. 

Sunday, November 9, 2014

Spark computations

In this post we'll start to build an understanding of how Spark computations are represented and executed, and why. Once again, I'm assuming familiarity with Scala, and the code for the examples can be found at https://github.com/spirom/LearningSpark -- in this case Ex2_Computations.scala.

Peeking under the lid of a simple computation

In the previous post we saw how to establish a SparkContext, and we'll continue from there by setting up an RDD of 10 number with 4 partitions, and applying two functions in series to its elements: the first one multiplying each element by 100 and the second adding 1 to each element.

val numbers = sc.parallelize(1 to 10, 4)
val bigger = numbers.map(n => n * 100)
val biggerStill = bigger.map(n => n + 1)

You might reasonably expect that each line of code would result in a new RDD being fully computed, but that's not the way it works. In order to allow the Spark system to execute computations efficiently (which mostly means with minimum communication) the system defers the computation until it knows what the goal is -- in other words, when it has to. And it doesn't have to until it reaches code that pulls data out of the RDD: to print it, to save it externally, or to compute some other data structure. Additionally, you can tell it to compute an RDD eagerly at some point, to cache it because you know you're going to use it a lot. So the three lines of code above achieve no Spark computation on RDDs whatsoever. Instead, a chain of dependencies is created for later use in execution. We can get Spark to show us this with an additional line of code:

println(biggerStill.toDebugString)

This produces a readable representation of what other RDDs a given RDD depends on, even showing which line of code is involved at each step. We'll refer to this as a "debug string" since that's what the Spark API calls it.  The (4) in the first line tells us that everything's happening across four partitions (since that's the way we set up the initial RDD). We'll look at partitions much more in later posts. Each RDD is shown with its ID which you can access using, say, biggerStill.id.

(4) MappedRDD[2] at map at Ex2_Computations.scala:21
 |  MappedRDD[1] at map at Ex2_Computations.scala:20
 |  ParallelCollectionRDD[0] at parallelize at Ex2_Computations.scala:19

But adding just one line of code changes this -- we'll use the reduce() method on RDD to sum the numbers:

val s = biggerStill.reduce(_ + _)

To execute this line the Spark system needs to run through the dependencies, apply any optimizations, and compute a final answer.

You can actually observe this quite nicely when running the sample project. Start by emptying out the log4j.properties file (or just change the log4j.rootCategory to INFO.) Then comment out everything below the definition of biggerStill and run the program. You'll see lots of log chatter before the debug string, but none after. When you uncomment the definition of s, you'll get more than a dozen lines of extra chatter as the value is actually computed.

A more complex example

You can make the dependencies more complex by defining another RDD in terms of two of the prior RDDs. There's an entire post comping up about operations on multiple RDDs, but we'll just use "++" or "union", which happens not to be a true set union as it retains duplicates.

val moreNumbers = bigger ++ biggerStill

At this point printing the debug string yields some insight, and, if you look carefully, a reminder of why you really want Spark to optimize computations -- you'd hate for it to compute 'bigger', alias MappedRDD[1], twice! Unfortunately this doesn't show the tree structure of the dependencies, so in the sample code on GitHub you'll see the definition and use of a Scala function called showDep that makes the transitive dependencies clearer.

(8) UnionRDD[3] at $plus$plus at Ex2_Computations.scala:34
 |  MappedRDD[1] at map at Ex2_Computations.scala:20
 |  ParallelCollectionRDD[0] at parallelize at Ex2_Computations.scala:19
 |  MappedRDD[2] at map at Ex2_Computations.scala:21
 |  MappedRDD[1] at map at Ex2_Computations.scala:20
 |  ParallelCollectionRDD[0] at parallelize at Ex2_Computations.scala:19

The effect of caching and checkpointing

As mentioned earlier, you can ask Spark to compute an RDD and cache its value if you believe you'll be using it a lot and you don't want to recompute it each time.


moreNumbers.cache()

Caching an RDD doesn't allow the dependence tree to be discarded, because the cache could be destroyed by a worker failure.

You can also specify that a particular RDD should be checkpointed, after specifying a checkpoint directory for the Spark context.

sc.setCheckpointDir("c:/temp/sparkcps")
moreNumbers.checkpoint()

But it's not enough to know we want it check-pointed: the values haven't been calculated so the Spark system doesn't have anything to write into the checkpoint file. It's only when Spark has a reason to compute the RDD that a checkpoint file can be written.

moreNumbers.count()

This forces the value of moreNumbers to be calculated, and so the value can be check-pointed. Let's look at the debug string now:

(8) UnionRDD[3] at $plus$plus at Ex2_Computations.scala:34
 |  CheckpointRDD[4] at count at Ex2_Computations.scala:49

Cached RDDs are not represented in the debug string, but checkpoint files are represented, because they replace the corresponding subtree of the dependencies: 'moreNumbers' won't need to be recomputed even if the computation is restarted because of a failure.

Error Handling

The "lazy" way Spark computes RDDs has an impact on error exception generation, and hence error handling, that's useful to understand. Consider this code snippet:

val thisWillBlowUp = numbers map {
  case (7) => { throw new Exception }
  case (n) => n
}

You'd expect it to throw an exception because 'numbers' contains 7. But it doesn't until 'thisWillBlowUp' is actually used in a computation. This next snippet will, indeed, get the exception to be thrown. This can feel pretty strange because, if you think of RDD definitions like normal data structure definitions, you expect to see exceptions that are thrown locally. So, in some ways, it helps to think of RDDs as functions rather than data structures some of the time (or, equivalently for those familiar with the finer points of functional programming, as lazy definitions rather than eager ones.)

try {
  println(thisWillBlowUp.count())
  println("wait -- it should have blown up")
} catch {
  case (e: Exception) => println("Yep, it blew up now")
}

What you've learned

Now let's take stock of why you need to know all this. There are actually several reasons:
  1. It helps you understand how spark achieves efficient computation, so you won't waste your time optimizing needlessly (save it for more advanced situations where Spark does need your help.)
  2. You'll be able to understand the logs better, and won't be surprised when things are "out of order". 
  3. Error handling will make more sense. 
I hesitated to take the lid off Spark mechanisms this early in a series of introductory posts, but you don't need to think about what I've told you here all the time when programming in Spark. It's something to put at the back of your mind and drag out as a tool when something is behaving strangely. When we continue this series we'll go back to using Spark for real work. Then, every once in a while, we'll peek underneath the mechanisms to help you stay out of trouble.

Thursday, November 6, 2014

An easy way to start learning Spark


Getting started with Spark

Big data computing systems like Hadoop and Spark tend to be large and complex, making it quite hard to start learning how to use them. And, of course, to solve large problems you really do have to tackle this complexity, as you'll need a cluster large enough for your problem, and realistic enough to measure and tune the performance of your solution. But there's a lot to learn before taking on huge problems, and it's useful to have an easy "on ramp" to get started. Then, as you build skills and competence you can graduate either to using a cluster that somebody else has configured, or configuring one yourself.

Furthermore, the Hadoop and Spark projects have been rather Linux-centric, whereas most of us have easier access to machines running Windows and Mac OS. Again, configuring a production cluster will usually involve Linux, but it's helpful to get started on a system you know well.

In this, the first of a series of posts on learning Spark, I'm going to use an approach that doesn't require you to set up a cluster or build complex software from the sources. It should also work equally well on Windows, Mac OS and Linux -- and I am in fact developing the examples on Windows 8.

Installing the software

You need the following software to follow along:
  1. A Java Development Kit -- I'm using 1.7.0 
  2. A set of Scala binaries -- I'm using 2.10 
  3. IntelliJ IDEA development environment -- I'm using 13.1 Community Edition
  4. The IDEA Scala plugin -- instructions available here
Notice there's no need to download Spark itself.

The code for the examples is available at https://github.com/spirom/LearningSpark. (You will need to make minor adjustments for Mac OS or Linux.)

Creating a project

Create a Scala sbt project in idea. While the programs we write will be very simple we will use sbt to manage our dependencies on the various Spark libraries.




After doing this wait a couple of minutes for IDEA to create the folder structure of a standard project (it tends to behave like it's ready even when it isn't.)


Getting Spark

Once the folder structure has been created you can see that the build.sbt file exists at the top level. 

Edit it to create a very simple project that depends on Spark Core package Version 1.1.0, built for Scala 2.10. 

name := "LearningSpark"

version := "1.0"

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.1.0"

When you save this, IDEA will prompt you to refresh it, and then spend some time downloading everything you need. That's all there is to it: no building, no cluster setup, and no daemon/service management. You're ready to write and run your first Spark program.


Your first Spark program

Right click on the src/main/scala node in your project and select New/Scala Class, and call you class Ex1_SimpleRDD. Now paste the following code into it, replacing anything IDEA already inserted. . 

import org.apache.spark.{SparkContext, SparkConf}

object Ex1_SimpleRDD {
  def main (args: Array[String]) {
    val conf = new SparkConf().setAppName("Ex1_SimpleRDD").setMaster("local[4]")
    val sc = new SparkContext(conf)

    val numbers = 1 to 10
    val numbersRDD = sc.parallelize(numbers, 4)
    println("Print each element of the original RDD")
    numbersRDD.foreach(println)
  }
}

Notice how we've defined on object rather than a class -- so we can define a main method and get running right away. Let's look at this method in some detail.

The first two lines set up a local Spark environment using four threads. This is all we need to get started. The next line defines a sequence of numbers called 'numbers'. Now we come to the essence of Spark: the Resilient Distributed Dataset, or RDD. This is the data structure on which all Spark computing takes place. In this case we ask Spark to take a regular Scala data structure and turn it into an RDD using the parallelize method on a SparkContext. We can default the number of partitions, or specify it as the second parameter.

To keep these programs brief, simple and as close to idiomatic Scala as we can, we won't put type declarations on our definitions. But it's helpful to know the types, and IDEA helps with this -- position your cursor in the middle of 'numbersRDD' as shown below and hit Alt-= (Windows) or Ctrl-Shift-P (Mac OS). You'll see that it as an RDD containing Scala Int elements.


Finally, in typical Scala style, we can print each element of the RDD. 

Run it

You're now ready to run your first Spark program. Right click on the object you just created in the project explorer and choose Run EX1_SimpleRDD. 

You should see something like the following: 



This is an awful lot of logging, and ultimately much more than we'll want to see, but it is interesting to see once. Notice an awful lot of work seems to get done just the print the numbers, but we'll delve into that fact more deeply later.

Also notice that the numbers appear out of order. I may be tempting to reach all sorts of conclusions about RDDs, the way they're stored, and what order information may be lost when they're created. But those conclusions aren't valid: the RDD is a parallel data structure, The foreach method causes the println loops to be run in parallel on each of the four partitions. Not only will those four print loops start in random order, but their outputs will be interleaved with each other. Run this example a few more times, and you'll see that the order keeps changing. Later we'll see that the actual order of the elements in RDD has NOT been lost.

Tune log verbosity

Now it's time to decrease the logging verbosity down to tthe level of wearnigns and errors only. Create the file src/main/resources/log4j.properties. 



Here is a reasonable starting point for the contents.

log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN

If you run the example again you'll see a much less cluttered response (with yet another order of numbers). 

Do some computing

Next you're ready to actually compute something. If you add the following few lines you can compute a transformed RDD this time containing Scala doubles (remember to check its type) where every element has been divided by 10. This transformation is run independently on each partition with requiring data transfer between the partitions. The next line gathers all the results into a regular Scala array (remember to check its type too) so when we print it's contents they come out in order -- the order wasn't lost after all.


val stillAnRDD = numbersRDD.map(n => n.toDouble / 10)
val nowAnArray = stillAnRDD.collect()
println("Now print each element of the transformed array")
nowAnArray.foreach(println)


Here's the output:


Now print each element of the transformed array
0.1
0.2
0.3
0.4
0.5
0.6
0.7
0.8
0.9
1.0

Start understanding RDDs and partitioning

To end your first encounter with Spark, let's use RDD's 'glom' method to see what's in the individual partitions of 'stillAnRDD' and print the results as in the following code. 

val partitions = stillAnRDD.glom()
println("We _should_ have 4 partitions")
println(partitions.count())
partitions.foreach(a => {
  println("Partition contents:" +
    a.foldLeft("")((s, e) => s + " " + e))
})

Hopefully, you've already checked the retun type and seen that it's an RDD[Array[Double]] where each element of this new RDD is the entire contents of a partition, colelcted into an array. Here's the expected output: 

We _should_ have 4 partitions
4
Partition contents: 0.6 0.7
Partition contents: 0.8 0.9 1.0
Partition contents: 0.3 0.4 0.5
Partition contents: 0.1 0.2

Notice how the elements of each partition are in order, but (except when you get lucky) the partitions themselves are not. Remember, 'partitions' is still an RDD, so the foreach loop runs in parallel. It's just the foldleft that runs sequentially on the contents of each of the four arrays.

Also notice that the distribution of elements across the partitions: not too bad at all -- this will become interesting when, in later posts, we look at partitioning and its effect on performance.

Finally, notice how using a single println for each partition keeps the outputs from getting interleaved.

What you've learned

Congratulations: without building anything from sources, setting up a cluster or installing and configuring anything more complex than a Java/Scala development environment you've written and run your first Spark program. Eventually, to solve real problems and diagnose the performance of your solution you'll need a cluster. But you may find an awful lot of your Spark development (and especially exploration and learning) gets done in this simplified environment from now on.