Sunday, December 28, 2014

Filtering and Projection in Spark SQL External Data Sources

In the previous post about the Spark external data source API, we looked at a simple way to register an external data source as a temporary table in Spark SQL. While very easy to use, that mechanism didn't allow Spark SQL to specify which data columns it was interested in, or to provide filters on the rows: the external data source had to produce all the data it had, and Spark SQL would filter the result records as needed. That's often very inefficient, especially when large amounts of data are involved. In this post we'll look at parts of the external data source API that solve this problem, allowing Spark SQL to push projection and filtering down to the external data source.

As with other posts in this series, I'm assuming familiarity with Scala, and the code for the examples can be found at https://github.com/spirom/LearningSpark -- in this case sql/RelationProviderFilterPushdown.scala.

As we discussed last time, the external data source API is located in the org.apache.spark.sql package, and consists of six abstract classes, six case classes and one trait. Projection pushdown is supported by the abstract class PrunedScan, but we'll cover its capabilities by discussing the more general abstract class PrunedFilteredScan, which also supports filter pushdown.

Much of this post is about how the external data source API gives you flexibility in deciding which of three layers of software filters the data that's being queried, so we need to agree on names for the layers. I'm going to name them as follows:

  1. External Data Engine: Some piece of (probably third party) software whose data you you want to be able to query from Spark SQL. While you may be willing to make changes to this software to make this easier, I'm going to assume that it has a fixed set of capabilities that will influence some of the choices you make in implementing an adapter for it.
  2. External Data Source Adapter: This is the code we're learning how to implement in this post -- an implementation of Spark SQL's external data source API that accepts calls from Spark SQL and turns them into calls to an external data engine, and then filters and transforms the results before returning them to Spark SQL across the API. When implementing the adapter, you can choose between three strategies: a simple one based on the TableScan abstract class, as discussed in the previous post, an intermediate one based on the PrunedScan abstract class, or the most powerful (and complex) one, PrunedFilteredScan, which is the topic of this post.
  3. Spark SQL: This is Spark's SQL query compiler, optimizer and top level execution engine, layered on top of core Spark. Individual queries can combine internal and external data, and even external data from multiple sources, and Spark SQL must cleanly break out sub-queries to the individual external data source adapters. When querying external data it has a number of choices in exactly what it requests from the external data source adapter. It detects which of the three implementation strategies is being used for any given adapter, and conforms to it, but the adapter needs to support all the possibilities for its chosen strategy.

In case you're coding along, you'll need the following imports, but remember that I gave a link to the complete code at the beginning of this post.

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.sources._

How to deal with filter "pushdown"

Filter "pushdown" is the general approach used by most database query systems for doing filtering as close as possible to the source of the data, based on the assumption that dealing with less data will almost always be faster. That isn't always true, especially if the filters are complex to evaluate, but that's not our problem here. Spark SQL's external data source API provides SQL the option of pushing a set of size simple filters down through the API, leaving the external data source adapter three options for utilizing those filters:

  1. Keep pushing: If the external data engine has a powerful query language such as SQL, MDX or XQuery, the most efficient implementation of all may result from the filters being used to construct the query that's sent to that system.
  2. Evaluate: Don't push the filter through into the external data engine but instead interpret it and use it to filter the records returned, rather than just blindly returning all records to Spark SQL.
  3. Do nothing: Spark SQL doesn't know which filters you've utilized, and doesn't really care -- it reapplies all of them to the records your adapter returns to make absolutely sure that no spurious records get through.

Two pragmatic points are worth making here. First, any particular implementation of an external data source can start out being naive, perhaps ignoring most filters and pushing some critical ones through to the external database, and over time optimizations can be introduced when needed. This makes the external data source API particularly easy to adopt while also being quite powerful. Second, the decision of to what to do with a set of filters passed by Spark SQL can be made by the implementation on a per-query, per-filter basis: for certain queries all filters may be ignored (perhaps because they are not critical to performance or they are hard to deal with), while for other queries either some or all filters can be used.

Finally, we should list the six kinds of filters that the external data source API supports:

Case ClassFilter
EqualToattr = const
GreaterThanattr > const
LessThanattr < const
GreaterThanOrEqualattr >= const
LessThanOrEqualattr <= const
Inattr = const_1 or ... or attr = const_n

A back-end data engine with an impoverished query "language"

The power and flexibility of the external data source API is best illustrated with an adapter for an external data engine that provides some significant query capability, but in most requests is rather naive. We'll simulate this with the extremely simple RangeDB class below, which represents a database system with the following characteristics:

  1. Tables have a fixed number of records numRecords.
  2. The records have an integer primary key from 1 to numRecords, and two additional integer fields squared and cubed.
  3. The only kind of query supported (via the getRecords() method) is an inclusive range query on the primary key, where the bounds are optional. This returns an iterator on the native records of the database.
case class RangeDBRecord(key: Int, squared: Int, cubed: Int)

class RangeIterator(begin: Int, end: Int) extends Iterator[RangeDBRecord] {
  var pos: Int = begin

  def hasNext: Boolean = pos <= end

  def next(): RangeDBRecord = {
    val rec = RangeDBRecord(pos, pos*pos, pos*pos*pos)
    pos = pos + 1
    rec
  }
}

class RangeDB(numRecords: Int) {

  def getRecords(min: Option[Int], max: Option[Int]): RangeIterator = {
    new RangeIterator(min.getOrElse(1), max.getOrElse(numRecords))
  }
}

The point of this unusual design is to strongly suggest that certain filters really should be pushed down into back end queries, while others absolutely cannot. It provides an informative twist because only four of the six kinds of filters can be passed through if they apply to the primary key column, but none can be passed through if they apply to other columns.

Wrangling filters

We organize all our filter handling into a single class called FilterInterpreter. This class needs to understand several things: the six kinds of queries that Spark SQL can pass through the API and what they mean, the characteristics of the records we are dealing with, represented for convenience as a Map, and finally the exact circumstances under which a filter can be pushed through to the external data engine.

When a FilterInterpreter is created it needs to group the filters by the attribute they are filtering, and the result is represented as a Map from attribute names to arrays of filters. Then the filters that we want to use for the primary key range query are pulled out, and a Map that represents all the remaining filters is produced.

class FilterInterpreter(allFilters: Array[Filter]) {

  private val allAttrToFilters: Map[String, Array[Filter]] = allFilters
    .map(f => (getFilterAttribute(f), f))
    .groupBy(attrFilter => attrFilter._1)
    .mapValues(a => a.map(p => p._2))

  val (min, max, otherKeyFilters) = splitKeyFilter

  private val attrToFilters = 
    allAttrToFilters - "val" + ("val" -> otherKeyFilters)

This relies on a utility method for figuring out which attribute a filter applies to:

  private def getFilterAttribute(f: Filter): String = {
    f match {
      case EqualTo(attr, v) => attr
      case GreaterThan(attr, v) => attr
      case LessThan(attr, v) => attr
      case GreaterThanOrEqual(attr, v) => attr
      case LessThanOrEqual(attr, v) => attr
      case In(attr, vs) => attr
    }
  }

We also need a utility method for splitting out the primary key range filter -- notice it knows the name of the primary key attribute, but it could easily be generalized. It also knows that only four of the six kinds of supported filters are relevant, and leaves the others alone.

  private def splitKeyFilter: (Option[Int], Option[Int], Array[Filter]) = {
    val keyFilters = allAttrToFilters.getOrElse("val", new Array[Filter](0))
    var min: Option[Int] = None
    var max: Option[Int] = None
    val others = new ArrayBuffer[Filter](0)
    keyFilters.foreach({
      case GreaterThan(attr, v) => min = Some(v.asInstanceOf[Int] + 1)
      case LessThan(attr, v) => max = Some(v.asInstanceOf[Int] - 1)
      case GreaterThanOrEqual(attr, v) => min = Some(v.asInstanceOf[Int])
      case LessThanOrEqual(attr, v) => max = Some(v.asInstanceOf[Int])
      case _ => others.++=: _
    })
    (min, max, others.toArray)
  }

Finally, we need a way to apply the filters to a row, which relies on a helper method for applying the filters relevant to a single attribute to the value of that attribute.

  def apply(r: Map[String, Int]): Boolean = {
    r.forall({
      case (attr, v) => {
        val filters = attrToFilters.getOrElse(attr, new Array[Filter](0))
        satisfiesAll(v, filters)
      }
    })
  }

  private def satisfiesAll(value: Int, filters: Array[Filter]): Boolean = {
    filters.forall({
      case EqualTo(attr, v) => value == v.asInstanceOf[Int]
      case GreaterThan(attr, v) => value > v.asInstanceOf[Int]
      case LessThan(attr, v) => value < v.asInstanceOf[Int]
      case GreaterThanOrEqual(attr, v) => value >= v.asInstanceOf[Int]
      case LessThanOrEqual(attr, v) => value <= v.asInstanceOf[Int]
      case In(attr, vs) => vs.exists(v => value == v.asInstanceOf[Int])
    })
  }
}

Dealing with requiredColumns

Now that Spark SQL is able to specify which columns it wants returned, the situation is a little more tricky than it may seem at first. When buildScan was returning the entire Row, Spark SQL could use the information it obtained from calling schema to understand each Row. When it requests only a subset of columns, it's not only essential that we return only the data from the requested columns in each row, but also that we return them in the order in which they were requested, which may well be (and frequently is) different from the order in which those columns were returned from schema. You'll see a solution to this in the methods makeMap() and projectAndWrapRow() below.

Putting it all together

The core of our new implementation is the class MyPFTableScan which this time extends PrunedFilteredScan and wraps our primitive database RangeDB above:

case class MyPFTableScan(count: Int, partitions: Int)
                      (@transient val sqlContext: SQLContext)
  extends PrunedFilteredScan {

  val db = new RangeDB(count)

We specify the schema by overriding schema as before.

  val schema: StructType = StructType(Seq(
    StructField("val", IntegerType, nullable = false),
    StructField("squared", IntegerType, nullable = false),
    StructField("cubed", IntegerType, nullable = false)
  ))

Now we need two convenience methods for constructing result rows. The first takes a result and converts it to a more flexible representation based on a Map, and the second projects out unneeded columns and puts the required ones in the right order as specified by the requiredColumns parameter.

  private def makeMap(rec: RangeDBRecord): Map[String, Int] = {
    val m = new HashMap[String, Int]()
    m += ("val" -> rec.key)
    m += ("squared" -> rec.squared)
    m += ("cubed" -> rec.cubed)
    m.toMap
  }

  private def projectAndWrapRow(m: Map[String, Int],
                                requiredColumns: Array[String]): Row = {
    val l = requiredColumns.map(c => m(c))
    val r = Row.fromSeq(l)
    r
  }

Finally, we put it all together in our more complex override of the buildScan method where we instantiate our FilterInterpreter from the filters passed in by Spark SQL, use the primary key bounds pulled out of it to query our primitive data engine, and iterate through the records returned, dealing with each record by:

  1. Converting it to a Map
  2. Applying the remaining filters
  3. Projecting out unwanted columns and wrapping the remainder in the right order.

Then we turn all the columns into an RDD.

  def buildScan(requiredColumns: Array[String], 
                filters: Array[Filter]): RDD[Row] = {
    val filterInterpreter = new FilterInterpreter(filters)
    val rowIterator = 
      db.getRecords(filterInterpreter.min, filterInterpreter.max)
    val rows = rowIterator
      .map(rec => makeMap(rec))
      .filter(r => filterInterpreter.apply(r))
      .map(r => projectAndWrapRow(r, requiredColumns))
    sqlContext.sparkContext.parallelize(rows.toSeq, partitions)
  }

}

As before, we need to extend the RelationProvider trait to produce the class we'll actually register with Spark SQL.

class CustomPFRP extends RelationProvider {

  def createRelation(sqlContext: SQLContext, 
                     parameters: Map[String, String]) = {
    MyPFTableScan(parameters("rows").toInt,
      parameters("partitions").toInt)(sqlContext)
  }

}

Registration

The process for registering our external data source is also the same as it was in the previous post.

 sqlContext.sql(
      s"""
        |CREATE TEMPORARY TABLE dataTable
        |USING sql.CustomPFRP
        |OPTIONS (partitions '9', rows '50')
      """.stripMargin)

Querying

Here are a few queries that exercise the new external data source. I recommend stepping through the code we've discussed using a debugger to see how Spark SQL actually calls it. If the parameters passed to buildScan() are surprising you should always remind yourself of the following:

  1. Spark SQL is not relying on the external data source adapter, or the external data engine to do the filtering, it is simply giving them the opportunity to do so. But that means it must request from the data source not only the data that must be returned by the query, but also all the data needed to evaluate the filters.
  2. Spark SQL is not obliged to pass in all the filters it could pass in.
  3. The external data source API allows Spark SQL to send a conjunction of simple filters. If the filter is not conjunctive, Spark SQL will have to evaluate all or most of it by itself.

The first example will request only two columns and pass in a single filter.

val data =
      sqlContext.sql(
        s"""
          |SELECT val, cubed
          |FROM dataTable
          |WHERE val <= 40 
          |ORDER BY val
        """.stripMargin)
    data.foreach(println)

This one will request three columns (so that Spark SQL can re-apply the filters) and pass in two filters.

val data =
      sqlContext.sql(
        s"""
          |SELECT val, cubed
          |FROM dataTable
          |WHERE val <= 40 AND squared >= 900
          |ORDER BY val
        """.stripMargin)
    data.foreach(println)

The third example involves a disjunctive filter, so Spark SQL will not pass any filters to the external data source adapter.

val data =
      sqlContext.sql(
        s"""
          |SELECT val, cubed
          |FROM dataTable
          |WHERE val <= 10 OR squared >= 900
          |ORDER BY val
        """.stripMargin)
    data.foreach(println)

Non-rectangular data

Since Spark SQL can handle non-rectangular data quite elegantly, it's natural to wonder about the extent of support for it in external data sources. For example, what if we wanted to collect the "squared" and "cubed" columns together into a structure called "data". It's pretty clear how we could define the schema in our PruneFilteredScan override:

val schema: StructType = StructType(Seq(
    StructField("val", IntegerType, nullable = false),
    StructField("data", StructType(Seq(
      StructField("squared", IntegerType, nullable = false),
      StructField("cubed", IntegerType, nullable = false)
    )))
    ))

Then the query could look like this:

val data =
      sqlContext.sql(
        s"""
          |SELECT val, data.cubed
          |FROM dataTable
          |WHERE val <= 40 AND data.squared >= 900
          |ORDER BY val
        """.stripMargin)

Then to make it all work, at the very least you'd need to construct the nested row correctly. Ignoring projection, you'd need to start with something like:

val row = Row(m("val"), Row(m("squared"), m("cubed")))

But obviously we'd have to adjust the way we process the query: how we handle projection and filtering. How will Spark SQL express what it needs us to return the "squared" sub-field of "data", but filter on the "cubed" sub-field? Actually, it doesn't. In the call to buildScan() it simply asks for the "data" column whenever any of the sub-fields appears in the SELECT, and just pulls the column apart for itself. Perhaps more disturbing is the fact that Spark SQL doesn't pass along filters for any of the sub-fields either, preferring to apply those filters itself, so our external data engine doesn't get the chance to take advantage of them at all.

All this just means that support for non-rectangular external data sources is present in Spark SQL as of Spark 1.2.0, and it works, but it doesn't really support sophisticated non-rectangular sources with any efficiency for the time being.

Sunday, December 21, 2014

External Data Sources in Spark 1.2.0

One of the most exciting aspects of the recent Spark 1.2.0 release is the Spark SQL API for external data sources. This is an API for mounting external data sources as temporary tables, which can then be queried through SQL. In this post we'll look at how you can define your own extremely simple external data source and query it. (Edit: In a followup post I've delved into what it takes to push filtering and projection into your external data source.)

As with other posts in this series, I'm assuming familiarity with Scala, and the code for the examples can be found at https://github.com/spirom/LearningSpark -- in this case sql/CustomRelationProvider.scala.

The external data source API is located in the org.apache.spark.sql package, and consists of six abstract classes, six case classes and one trait. However, to get started with a simple integration project all you need to understand is the trait RelationProvider and the abstract class TableScan.

In order to define the classes needed you'll need the following imports:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.sources.{TableScan, RelationProvider}

To keep this example simple, I won't involve any actual use of an external system. Instead I'll show how to create a table of synthetic data. Most of the work involves extending the abstract class TableScan, to provide (a) the table schema and (b) the row data. In this example, the table contains just three columns. The first is a unique integer and the second and third columns are that integer squared and cubed, respectively.

To understand this implementation you need to understand two aspects of Spark SQL that, mercifully, have recently been documented: StructType (and other type constructors) and Row in the org.apache.spark.sql package. In the Spark SQL Programming Guide, see the entries on "Programmatically Specifying the Schema" and "Spark SQL DataType Reference."

case class MyTableScan(count: Int, partitions: Int)
                      (@transient val sqlContext: SQLContext) 
  extends TableScan 
{

  val schema: StructType = StructType(Seq(
    StructField("val", IntegerType, nullable = false),
    StructField("squared", IntegerType, nullable = false),
    StructField("cubed", IntegerType, nullable = false)
  ))

  private def makeRow(i: Int): Row = Row(i, i*i, i*i*i)

  def buildScan: RDD[Row] = {
    val values = (1 to count).map(i => makeRow(i))
    sqlContext.sparkContext.parallelize(values, partitions)
  }
  
}

Now you need to extend the RelatoinProvider trait to provide the DDL interface for your data source. A data source is configured via a set of options, each of which is a key/value pair. By implementing the createRelation method we specify how to use these configuration parameters and the SQLContext to initialize an individual table. In the example, the only parameters are the number of rows in the table and the number of partitions for the resulting RDD.

class CustomRP extends RelationProvider {

  def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {
    MyTableScan(parameters("rows").toInt,
      parameters("partitions").toInt)(sqlContext)
  }
  
}

Now you need to register a temporary table based on your external source, using a new Spark SQL DDL command CREATE TEMPORARY TABLE. You need to provide the table name, the class that defines it (the one implementing RelationProvider) and the options list. Be very careful with your DDL syntax here, and if you get into trouble remember to check Spark's DEBUG logs. If Spark SQL can't parse your DDL, it will hide the detailed and informative error message in a log entry, try re-parsing your DDL as DML, and show you only the latter error message.

.
sqlContext.sql(
      s"""
        |CREATE TEMPORARY TABLE dataTable
        |USING sql.CustomRP
        |OPTIONS (partitions '9', rows '50')
      """.stripMargin)

Finally, you can query your temporary table like any other, referring to the attributes you specified as the "schema" when extending TableScan.

val data = sqlContext.sql("SELECT * FROM dataTable ORDER BY val")

That's all you need for a very simple external data source -- all that's left is write the code to interface to the "external" system of your choice. Keep in mind that the approach I've shown here will always fetch all the columns and all the rows -- if your SQL query only requires, say, a few rows and a few columns, they'll be materialized and discarded by the rest of the query plan. Most of the rest of the external data source API is for dealing with this problem, but that's for a different post.

Sunday, November 16, 2014

Notes from the second day of PNWScala 2014

Here are some notes from the second and last day of the very successful PNWScala 2014 conference.


Adding Tree and Tree: Distributed Decision Tree Learning - Avi Bryant (Stripe)

This was about the Brushfire framework for learning decision trees, based on Hadoop via Scalding and Algebird. Generality, modularity and composability seem to have been very carefully thought through. The high level approach is based on the PLANET paper. The code will soon be available at http://github.com/stripe/brushfire.


What's new since Programming in Scala - Marconi Lanna (Originate)

A guided tour of language features since the last edition of the book. Some notable ones:
  • App trait (2.9)
  • Range foreach optimization (2.10)
  • Parallel collections (2.9)
  • Generalized try catch finally with reusable exception handling via PartialFunction
  • Try [almost] monad (2.10) 
  • Implicit classes (2.10)
  • Value classes (2.10)
  • Extension methods (2.10)
  • String interpolation (2.10) and custom interpolators
  • Futures and promises (2.10, 2.9.3)
  • Dynamic trait (2.10)
  • Akka actors
  • Modularization of advanced language features
  • Reflection, macros and quasiquotes

One Year of Akka - Ryan Tanner (Conspire)

A "view from the trenches", describing adoption of Scala and Akka at an early stage startup, and dealing with scaling issues. We were reminded that "Akka won't save you from building a monolith" and that it's easy to end up with a tightly coupled system. Additional advice included "pull, don't push", as described in the Akka work pulling pattern and more specifically in a post on the Conspire blog. The latter is the last of a series of five posts on this whole effort, and all five seem very much worth reading. Like some other members of the audience I was surprised to hear that Conspire was in the process of making a turn away from Akka clustering (but not Akka actors) and planning to introduce Kafka.


Hands-on Scala.js - Li Haoyi (DropBox)

Lots of live coding in this talk demonstrated that Scala.js appears to deliver on its promise of unifying server side and portable browser side programming in a single, strongly typed language with decent performance. The story gets even stronger when ScalaTags is included, providing an interface to DOM. Examples projects started very simple but got quite complex. There was also a project showing common code on the browser and server, and, for a grand finale, an example where communication between browser and server code was type checked. A very engaging presentation. 


Unruly Creatures: Strategies for dealing with Real Numbers - Erik Osheim (Typelevel)

Starting with a "primitive math blooper real", this talk motivated and explained the Spire library providing various advanced and well behaved representations of numbers. 


What every (Scala) programmer should know about category theory - Gabriel Claramunt

I've watched people try to give variants of this talk for three decades and it hasn't gotten any easier, especially in front of an audience with varied interests and backgrounds. It's quite a bit more compelling with Scala than it was with Standard ML. The bigger problem is that Scala has been successful to a large degree because it hasn't just been pitched to people who have learned or are willing to learn category theory. Most Scala programmers will never learn it and that's mostly a good thing. But knowing it does yield some insight into Scala, so this talk remains worth giving, and perhaps the Scala variant is more relevant than those in the past. Best line: "I came for the abstraction, stayed for the composition."

It may be worth checking out "Category Theory Applied to Functional Programming."

I'm still very interested in the question "what shared conceptual model do all Scala programmers need?", but my starting point is that it probably isn't category theory. It may be a dumbed down version, that explains what a Monad is (and isn't) and why it matters.  


Building a Better Future: Advanced Error Handling for Concurrent Programming with Scalaz and Shapeless -- Jean-RĂ©mi Desjardins and Eddie Carlson (Whitepages)

The last of several good discussions of error handling, this time in the context of futures. Almost anybody who has used futures a lot has at some point needed to collect multiple futures into a single one. Then they learned the hard way that Future.sequence doesn't do quite what they want, returning the first error in traversal order of the sequence, rather than temporal order, and thus not "failing fast" as is usually desired. See, for example, this discussion on StackOverflow. A lot of this solution was over my head as I haven't used wither scalaz or shapeless, but the key ingredients were scalaz.Applicative, shapeless.HList and HList sequencing features of shapeless-contrib. I'm hoping the slides get posted as this is an important problem. 


Composing Project Archetypes with SBT AutoPlugins - Mark Schaake (Allen Institute for Artificial Intelligence)

How to solve "Multiple Build Maintenance Hell" (MBNH) in an organization with lots of sbt projects. The solution described is essentially to define shared, versioned plugins based on the AutoPlugin concept introduced in sbt 0.13.5 -- described in this tutorial. Each plugin covers on facet of a project (a command line tool, a web service, ...) and plugins can depend on each-other using "requires". The specific plugins defined have been open sourced. The approach seems intuitively right, but somebody asked how a developer could be sure to avoid accidentally overriding plugin behavior. This seems like an interesting problem as sbt seems to be an area where developers often cudgel their code into submission without knowing what they're doing and the first thing that "works" tends to get checked in (and not looked at until something breaks.) 

Saturday, November 15, 2014

Notes from the first day of PNWScala 2014

Here are some of my impressions from the 1st day of PNWScala 2014. This is my first Scala conference and I'm delighted to see it well organized and well attended.

Rapture: The Art of the One-Liner -- Jon Pretty (Propensive) 

An introduction to the Rapture libraries for IO and related parsing tasks via a series of evocative and idiomatic "one liners". While no responsible Scala programmer would actually write them as one liners the point was well made: the JSON parsing code was very elegant. Jon's remarks on error handling led me to posit a litmus test for this kind of code being "industrial strength".

I want to be able to write similarly idiomatic and elegant code that allows me to process a large number of documents and:

  • Return representations of the valid documents (say in terms of case classes)
  • Return representations of the invalid documents that:
    • Identify the invalid document by some identifier and/or content
    • Explain what's invalid about it
  • For bonus points, while processing, give incremental counts of valid/invalid documents so I can decide that my failure rate is unacceptable (and something fundamental has gone wrong) or is acceptable and I can either discard or subsequently fix and reprocess the failure cases.
In a followup conversation Jon showed me how to do this with Try so I think it's plausible that Rapture actually meets my test.

The First Hit is Always Free: A Skeptic's Look at scalaz' "Gateway Drugs" -- Brendan McAdams (Netflix) 

This was an interesting view of scalaz "from the outside", doing a nice job of explaining why scalaz may be of interest to more than just functional programming researchers. It started with a tour of some of the usual Scala and scalaz approaches to error handling -- Option and Validation -- and why they're hard to work with (not Mondaic) . Then Brendan explained  scalaz's disjunction operator and showed how it makes it easier to accumulate error information while processing data. (The timing of this was interesting, coming on the heels of my error processing concerns about Rapture as described above.)

Types out of patmat -- Stephen Compall (McGraw Hill Financial)

I think everybody in the audience who hasn't worked among type theory researchers found this one really tough. I have worked with type theory researchers, but that was 22 years ago, which seems to be about 21 years too long for understanding this talk. My takeaway is that I should learn more about the subtleties of Scala pattern matching, and that I shouldn't expect it to work very well if the type of what I'm matching is complex enough to be interesting to a type theory researcher. I suspect this was a really interesting and informative talk for people who had the necessary background. The high-level warnings are useful to every Scala programmer.

Don't Cross the Streams -- Marc Millstone (Socrata)

This was focused on aspects of counting records in a stream, with adequate or at least understood accuracy, while dealing with bounded memory. The approaches are embodied in the Tallyho project. Marc talked about three kinds of approaches, based respectively on engineering, math and ignorance. There were a number of interesting techniques, some based on hashing, and all essentially unfamiliar to me. Stream processing seems to be an increasingly important application of Scala, especially with the success of Spark streaming. This talk also left me wanting to learn more about stream-lib, Algebird and Shapeless, and to check out the Highly Scalable Blog.

Apache Spark I: From Scala Collections to Fast Interactive Big Data with Spark -- Evan Chan (Socrata) 

A nice overview of Apache Spark from a Scala perspective, emphasizing the smooth transition from Scala serial to Scala parallel collections (Scala 2.10), and then to the RDD as a distributed collection, and Spark laziness as a natural extension of Scala lazy collections (Streams and Iterators). This material was mostly very familiar to me but Evan did a great job of emphasizing how natural it all was and of explaining it.

It was also interesting to hear that the entire Socrata backend is implemented in Scala.

Apache Spark II: Streaming Big Data Analytics with Team Apache, Scala & Akka -- Helena Edelson (Datastax)

At first this seemed like an overview of Spark streaming, with which I was already quite familiar, but it turned out to be more about building quite complex streaming-oriented Spark applications that also used Akka. There were a number of interesting points:

  • I had never heard of the Lambda Architecture, of which this approach is an instance
  • A LOT of audience members were using Apache Kafka
  • A significant but much smaller number were using Apache Cassandra
  • An apparently well known discussion on the apache-spark-user-list of why Spark is based on Scala was summarized as: funcitional programming, JVM leverage, function serializability, static typing and the REPL
  • The KillrWeather project is a reference application whose design is based on the ideas presented 
  • It's currently difficult to combine stream and historical data in a single application because SparkContext is not serializable  

Miniboxing: JVM Generics without the overhead  -- Vlad Ureche (EPFL) 

Miniboxing is essentially efficient specialization of generics for primitive types that fit in a long integer by actually fitting them in a long integer, with sometimes dramatic performance improvements and/or code size reductions.

It is available to use via a compiler plugin that is undergoing active development and improvement. A comment from the back of the room: "That is an insane amount of documentation for an in-development compiler plugin".

The value of this was illustrated using an image processing example.

Some background reading comes as a post called "Quirks of Scala specialization" by Alex Prokopec.

Towards a Safer Scala -- Leif Wickland (Oracle) 

This talk explored the various approaches to helping ensure that a project's Scala code is safe and correct:

My take-away was the the compiler parameters, Scalastyle and WartRemover were at the point where I should actually consider using them, and the remainder were worth watching. 

Sunday, November 9, 2014

Spark computations

In this post we'll start to build an understanding of how Spark computations are represented and executed, and why. Once again, I'm assuming familiarity with Scala, and the code for the examples can be found at https://github.com/spirom/LearningSpark -- in this case Ex2_Computations.scala.

Peeking under the lid of a simple computation

In the previous post we saw how to establish a SparkContext, and we'll continue from there by setting up an RDD of 10 number with 4 partitions, and applying two functions in series to its elements: the first one multiplying each element by 100 and the second adding 1 to each element.

val numbers = sc.parallelize(1 to 10, 4)
val bigger = numbers.map(n => n * 100)
val biggerStill = bigger.map(n => n + 1)

You might reasonably expect that each line of code would result in a new RDD being fully computed, but that's not the way it works. In order to allow the Spark system to execute computations efficiently (which mostly means with minimum communication) the system defers the computation until it knows what the goal is -- in other words, when it has to. And it doesn't have to until it reaches code that pulls data out of the RDD: to print it, to save it externally, or to compute some other data structure. Additionally, you can tell it to compute an RDD eagerly at some point, to cache it because you know you're going to use it a lot. So the three lines of code above achieve no Spark computation on RDDs whatsoever. Instead, a chain of dependencies is created for later use in execution. We can get Spark to show us this with an additional line of code:

println(biggerStill.toDebugString)

This produces a readable representation of what other RDDs a given RDD depends on, even showing which line of code is involved at each step. We'll refer to this as a "debug string" since that's what the Spark API calls it.  The (4) in the first line tells us that everything's happening across four partitions (since that's the way we set up the initial RDD). We'll look at partitions much more in later posts. Each RDD is shown with its ID which you can access using, say, biggerStill.id.

(4) MappedRDD[2] at map at Ex2_Computations.scala:21
 |  MappedRDD[1] at map at Ex2_Computations.scala:20
 |  ParallelCollectionRDD[0] at parallelize at Ex2_Computations.scala:19

But adding just one line of code changes this -- we'll use the reduce() method on RDD to sum the numbers:

val s = biggerStill.reduce(_ + _)

To execute this line the Spark system needs to run through the dependencies, apply any optimizations, and compute a final answer.

You can actually observe this quite nicely when running the sample project. Start by emptying out the log4j.properties file (or just change the log4j.rootCategory to INFO.) Then comment out everything below the definition of biggerStill and run the program. You'll see lots of log chatter before the debug string, but none after. When you uncomment the definition of s, you'll get more than a dozen lines of extra chatter as the value is actually computed.

A more complex example

You can make the dependencies more complex by defining another RDD in terms of two of the prior RDDs. There's an entire post comping up about operations on multiple RDDs, but we'll just use "++" or "union", which happens not to be a true set union as it retains duplicates.

val moreNumbers = bigger ++ biggerStill

At this point printing the debug string yields some insight, and, if you look carefully, a reminder of why you really want Spark to optimize computations -- you'd hate for it to compute 'bigger', alias MappedRDD[1], twice! Unfortunately this doesn't show the tree structure of the dependencies, so in the sample code on GitHub you'll see the definition and use of a Scala function called showDep that makes the transitive dependencies clearer.

(8) UnionRDD[3] at $plus$plus at Ex2_Computations.scala:34
 |  MappedRDD[1] at map at Ex2_Computations.scala:20
 |  ParallelCollectionRDD[0] at parallelize at Ex2_Computations.scala:19
 |  MappedRDD[2] at map at Ex2_Computations.scala:21
 |  MappedRDD[1] at map at Ex2_Computations.scala:20
 |  ParallelCollectionRDD[0] at parallelize at Ex2_Computations.scala:19

The effect of caching and checkpointing

As mentioned earlier, you can ask Spark to compute an RDD and cache its value if you believe you'll be using it a lot and you don't want to recompute it each time.


moreNumbers.cache()

Caching an RDD doesn't allow the dependence tree to be discarded, because the cache could be destroyed by a worker failure.

You can also specify that a particular RDD should be checkpointed, after specifying a checkpoint directory for the Spark context.

sc.setCheckpointDir("c:/temp/sparkcps")
moreNumbers.checkpoint()

But it's not enough to know we want it check-pointed: the values haven't been calculated so the Spark system doesn't have anything to write into the checkpoint file. It's only when Spark has a reason to compute the RDD that a checkpoint file can be written.

moreNumbers.count()

This forces the value of moreNumbers to be calculated, and so the value can be check-pointed. Let's look at the debug string now:

(8) UnionRDD[3] at $plus$plus at Ex2_Computations.scala:34
 |  CheckpointRDD[4] at count at Ex2_Computations.scala:49

Cached RDDs are not represented in the debug string, but checkpoint files are represented, because they replace the corresponding subtree of the dependencies: 'moreNumbers' won't need to be recomputed even if the computation is restarted because of a failure.

Error Handling

The "lazy" way Spark computes RDDs has an impact on error exception generation, and hence error handling, that's useful to understand. Consider this code snippet:

val thisWillBlowUp = numbers map {
  case (7) => { throw new Exception }
  case (n) => n
}

You'd expect it to throw an exception because 'numbers' contains 7. But it doesn't until 'thisWillBlowUp' is actually used in a computation. This next snippet will, indeed, get the exception to be thrown. This can feel pretty strange because, if you think of RDD definitions like normal data structure definitions, you expect to see exceptions that are thrown locally. So, in some ways, it helps to think of RDDs as functions rather than data structures some of the time (or, equivalently for those familiar with the finer points of functional programming, as lazy definitions rather than eager ones.)

try {
  println(thisWillBlowUp.count())
  println("wait -- it should have blown up")
} catch {
  case (e: Exception) => println("Yep, it blew up now")
}

What you've learned

Now let's take stock of why you need to know all this. There are actually several reasons:
  1. It helps you understand how spark achieves efficient computation, so you won't waste your time optimizing needlessly (save it for more advanced situations where Spark does need your help.)
  2. You'll be able to understand the logs better, and won't be surprised when things are "out of order". 
  3. Error handling will make more sense. 
I hesitated to take the lid off Spark mechanisms this early in a series of introductory posts, but you don't need to think about what I've told you here all the time when programming in Spark. It's something to put at the back of your mind and drag out as a tool when something is behaving strangely. When we continue this series we'll go back to using Spark for real work. Then, every once in a while, we'll peek underneath the mechanisms to help you stay out of trouble.

Thursday, November 6, 2014

An easy way to start learning Spark


Getting started with Spark

Big data computing systems like Hadoop and Spark tend to be large and complex, making it quite hard to start learning how to use them. And, of course, to solve large problems you really do have to tackle this complexity, as you'll need a cluster large enough for your problem, and realistic enough to measure and tune the performance of your solution. But there's a lot to learn before taking on huge problems, and it's useful to have an easy "on ramp" to get started. Then, as you build skills and competence you can graduate either to using a cluster that somebody else has configured, or configuring one yourself.

Furthermore, the Hadoop and Spark projects have been rather Linux-centric, whereas most of us have easier access to machines running Windows and Mac OS. Again, configuring a production cluster will usually involve Linux, but it's helpful to get started on a system you know well.

In this, the first of a series of posts on learning Spark, I'm going to use an approach that doesn't require you to set up a cluster or build complex software from the sources. It should also work equally well on Windows, Mac OS and Linux -- and I am in fact developing the examples on Windows 8.

Installing the software

You need the following software to follow along:
  1. A Java Development Kit -- I'm using 1.7.0 
  2. A set of Scala binaries -- I'm using 2.10 
  3. IntelliJ IDEA development environment -- I'm using 13.1 Community Edition
  4. The IDEA Scala plugin -- instructions available here
Notice there's no need to download Spark itself.

The code for the examples is available at https://github.com/spirom/LearningSpark. (You will need to make minor adjustments for Mac OS or Linux.)

Creating a project

Create a Scala sbt project in idea. While the programs we write will be very simple we will use sbt to manage our dependencies on the various Spark libraries.




After doing this wait a couple of minutes for IDEA to create the folder structure of a standard project (it tends to behave like it's ready even when it isn't.)


Getting Spark

Once the folder structure has been created you can see that the build.sbt file exists at the top level. 

Edit it to create a very simple project that depends on Spark Core package Version 1.1.0, built for Scala 2.10. 

name := "LearningSpark"

version := "1.0"

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.1.0"

When you save this, IDEA will prompt you to refresh it, and then spend some time downloading everything you need. That's all there is to it: no building, no cluster setup, and no daemon/service management. You're ready to write and run your first Spark program.


Your first Spark program

Right click on the src/main/scala node in your project and select New/Scala Class, and call you class Ex1_SimpleRDD. Now paste the following code into it, replacing anything IDEA already inserted. . 

import org.apache.spark.{SparkContext, SparkConf}

object Ex1_SimpleRDD {
  def main (args: Array[String]) {
    val conf = new SparkConf().setAppName("Ex1_SimpleRDD").setMaster("local[4]")
    val sc = new SparkContext(conf)

    val numbers = 1 to 10
    val numbersRDD = sc.parallelize(numbers, 4)
    println("Print each element of the original RDD")
    numbersRDD.foreach(println)
  }
}

Notice how we've defined on object rather than a class -- so we can define a main method and get running right away. Let's look at this method in some detail.

The first two lines set up a local Spark environment using four threads. This is all we need to get started. The next line defines a sequence of numbers called 'numbers'. Now we come to the essence of Spark: the Resilient Distributed Dataset, or RDD. This is the data structure on which all Spark computing takes place. In this case we ask Spark to take a regular Scala data structure and turn it into an RDD using the parallelize method on a SparkContext. We can default the number of partitions, or specify it as the second parameter.

To keep these programs brief, simple and as close to idiomatic Scala as we can, we won't put type declarations on our definitions. But it's helpful to know the types, and IDEA helps with this -- position your cursor in the middle of 'numbersRDD' as shown below and hit Alt-= (Windows) or Ctrl-Shift-P (Mac OS). You'll see that it as an RDD containing Scala Int elements.


Finally, in typical Scala style, we can print each element of the RDD. 

Run it

You're now ready to run your first Spark program. Right click on the object you just created in the project explorer and choose Run EX1_SimpleRDD. 

You should see something like the following: 



This is an awful lot of logging, and ultimately much more than we'll want to see, but it is interesting to see once. Notice an awful lot of work seems to get done just the print the numbers, but we'll delve into that fact more deeply later.

Also notice that the numbers appear out of order. I may be tempting to reach all sorts of conclusions about RDDs, the way they're stored, and what order information may be lost when they're created. But those conclusions aren't valid: the RDD is a parallel data structure, The foreach method causes the println loops to be run in parallel on each of the four partitions. Not only will those four print loops start in random order, but their outputs will be interleaved with each other. Run this example a few more times, and you'll see that the order keeps changing. Later we'll see that the actual order of the elements in RDD has NOT been lost.

Tune log verbosity

Now it's time to decrease the logging verbosity down to tthe level of wearnigns and errors only. Create the file src/main/resources/log4j.properties. 



Here is a reasonable starting point for the contents.

log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN

If you run the example again you'll see a much less cluttered response (with yet another order of numbers). 

Do some computing

Next you're ready to actually compute something. If you add the following few lines you can compute a transformed RDD this time containing Scala doubles (remember to check its type) where every element has been divided by 10. This transformation is run independently on each partition with requiring data transfer between the partitions. The next line gathers all the results into a regular Scala array (remember to check its type too) so when we print it's contents they come out in order -- the order wasn't lost after all.


val stillAnRDD = numbersRDD.map(n => n.toDouble / 10)
val nowAnArray = stillAnRDD.collect()
println("Now print each element of the transformed array")
nowAnArray.foreach(println)


Here's the output:


Now print each element of the transformed array
0.1
0.2
0.3
0.4
0.5
0.6
0.7
0.8
0.9
1.0

Start understanding RDDs and partitioning

To end your first encounter with Spark, let's use RDD's 'glom' method to see what's in the individual partitions of 'stillAnRDD' and print the results as in the following code. 

val partitions = stillAnRDD.glom()
println("We _should_ have 4 partitions")
println(partitions.count())
partitions.foreach(a => {
  println("Partition contents:" +
    a.foldLeft("")((s, e) => s + " " + e))
})

Hopefully, you've already checked the retun type and seen that it's an RDD[Array[Double]] where each element of this new RDD is the entire contents of a partition, colelcted into an array. Here's the expected output: 

We _should_ have 4 partitions
4
Partition contents: 0.6 0.7
Partition contents: 0.8 0.9 1.0
Partition contents: 0.3 0.4 0.5
Partition contents: 0.1 0.2

Notice how the elements of each partition are in order, but (except when you get lucky) the partitions themselves are not. Remember, 'partitions' is still an RDD, so the foreach loop runs in parallel. It's just the foldleft that runs sequentially on the contents of each of the four arrays.

Also notice that the distribution of elements across the partitions: not too bad at all -- this will become interesting when, in later posts, we look at partitioning and its effect on performance.

Finally, notice how using a single println for each partition keeps the outputs from getting interleaved.

What you've learned

Congratulations: without building anything from sources, setting up a cluster or installing and configuring anything more complex than a Java/Scala development environment you've written and run your first Spark program. Eventually, to solve real problems and diagnose the performance of your solution you'll need a cluster. But you may find an awful lot of your Spark development (and especially exploration and learning) gets done in this simplified environment from now on.