Sunday, December 28, 2014

Filtering and Projection in Spark SQL External Data Sources

In the previous post about the Spark external data source API, we looked at a simple way to register an external data source as a temporary table in Spark SQL. While very easy to use, that mechanism didn't allow Spark SQL to specify which data columns it was interested in, or to provide filters on the rows: the external data source had to produce all the data it had, and Spark SQL would filter the result records as needed. That's often very inefficient, especially when large amounts of data are involved. In this post we'll look at parts of the external data source API that solve this problem, allowing Spark SQL to push projection and filtering down to the external data source.

As with other posts in this series, I'm assuming familiarity with Scala, and the code for the examples can be found at https://github.com/spirom/LearningSpark -- in this case sql/RelationProviderFilterPushdown.scala.

As we discussed last time, the external data source API is located in the org.apache.spark.sql package, and consists of six abstract classes, six case classes and one trait. Projection pushdown is supported by the abstract class PrunedScan, but we'll cover its capabilities by discussing the more general abstract class PrunedFilteredScan, which also supports filter pushdown.

Much of this post is about how the external data source API gives you flexibility in deciding which of three layers of software filters the data that's being queried, so we need to agree on names for the layers. I'm going to name them as follows:

  1. External Data Engine: Some piece of (probably third party) software whose data you you want to be able to query from Spark SQL. While you may be willing to make changes to this software to make this easier, I'm going to assume that it has a fixed set of capabilities that will influence some of the choices you make in implementing an adapter for it.
  2. External Data Source Adapter: This is the code we're learning how to implement in this post -- an implementation of Spark SQL's external data source API that accepts calls from Spark SQL and turns them into calls to an external data engine, and then filters and transforms the results before returning them to Spark SQL across the API. When implementing the adapter, you can choose between three strategies: a simple one based on the TableScan abstract class, as discussed in the previous post, an intermediate one based on the PrunedScan abstract class, or the most powerful (and complex) one, PrunedFilteredScan, which is the topic of this post.
  3. Spark SQL: This is Spark's SQL query compiler, optimizer and top level execution engine, layered on top of core Spark. Individual queries can combine internal and external data, and even external data from multiple sources, and Spark SQL must cleanly break out sub-queries to the individual external data source adapters. When querying external data it has a number of choices in exactly what it requests from the external data source adapter. It detects which of the three implementation strategies is being used for any given adapter, and conforms to it, but the adapter needs to support all the possibilities for its chosen strategy.

In case you're coding along, you'll need the following imports, but remember that I gave a link to the complete code at the beginning of this post.

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.sources._

How to deal with filter "pushdown"

Filter "pushdown" is the general approach used by most database query systems for doing filtering as close as possible to the source of the data, based on the assumption that dealing with less data will almost always be faster. That isn't always true, especially if the filters are complex to evaluate, but that's not our problem here. Spark SQL's external data source API provides SQL the option of pushing a set of size simple filters down through the API, leaving the external data source adapter three options for utilizing those filters:

  1. Keep pushing: If the external data engine has a powerful query language such as SQL, MDX or XQuery, the most efficient implementation of all may result from the filters being used to construct the query that's sent to that system.
  2. Evaluate: Don't push the filter through into the external data engine but instead interpret it and use it to filter the records returned, rather than just blindly returning all records to Spark SQL.
  3. Do nothing: Spark SQL doesn't know which filters you've utilized, and doesn't really care -- it reapplies all of them to the records your adapter returns to make absolutely sure that no spurious records get through.

Two pragmatic points are worth making here. First, any particular implementation of an external data source can start out being naive, perhaps ignoring most filters and pushing some critical ones through to the external database, and over time optimizations can be introduced when needed. This makes the external data source API particularly easy to adopt while also being quite powerful. Second, the decision of to what to do with a set of filters passed by Spark SQL can be made by the implementation on a per-query, per-filter basis: for certain queries all filters may be ignored (perhaps because they are not critical to performance or they are hard to deal with), while for other queries either some or all filters can be used.

Finally, we should list the six kinds of filters that the external data source API supports:

Case ClassFilter
EqualToattr = const
GreaterThanattr > const
LessThanattr < const
GreaterThanOrEqualattr >= const
LessThanOrEqualattr <= const
Inattr = const_1 or ... or attr = const_n

A back-end data engine with an impoverished query "language"

The power and flexibility of the external data source API is best illustrated with an adapter for an external data engine that provides some significant query capability, but in most requests is rather naive. We'll simulate this with the extremely simple RangeDB class below, which represents a database system with the following characteristics:

  1. Tables have a fixed number of records numRecords.
  2. The records have an integer primary key from 1 to numRecords, and two additional integer fields squared and cubed.
  3. The only kind of query supported (via the getRecords() method) is an inclusive range query on the primary key, where the bounds are optional. This returns an iterator on the native records of the database.
case class RangeDBRecord(key: Int, squared: Int, cubed: Int)

class RangeIterator(begin: Int, end: Int) extends Iterator[RangeDBRecord] {
  var pos: Int = begin

  def hasNext: Boolean = pos <= end

  def next(): RangeDBRecord = {
    val rec = RangeDBRecord(pos, pos*pos, pos*pos*pos)
    pos = pos + 1
    rec
  }
}

class RangeDB(numRecords: Int) {

  def getRecords(min: Option[Int], max: Option[Int]): RangeIterator = {
    new RangeIterator(min.getOrElse(1), max.getOrElse(numRecords))
  }
}

The point of this unusual design is to strongly suggest that certain filters really should be pushed down into back end queries, while others absolutely cannot. It provides an informative twist because only four of the six kinds of filters can be passed through if they apply to the primary key column, but none can be passed through if they apply to other columns.

Wrangling filters

We organize all our filter handling into a single class called FilterInterpreter. This class needs to understand several things: the six kinds of queries that Spark SQL can pass through the API and what they mean, the characteristics of the records we are dealing with, represented for convenience as a Map, and finally the exact circumstances under which a filter can be pushed through to the external data engine.

When a FilterInterpreter is created it needs to group the filters by the attribute they are filtering, and the result is represented as a Map from attribute names to arrays of filters. Then the filters that we want to use for the primary key range query are pulled out, and a Map that represents all the remaining filters is produced.

class FilterInterpreter(allFilters: Array[Filter]) {

  private val allAttrToFilters: Map[String, Array[Filter]] = allFilters
    .map(f => (getFilterAttribute(f), f))
    .groupBy(attrFilter => attrFilter._1)
    .mapValues(a => a.map(p => p._2))

  val (min, max, otherKeyFilters) = splitKeyFilter

  private val attrToFilters = 
    allAttrToFilters - "val" + ("val" -> otherKeyFilters)

This relies on a utility method for figuring out which attribute a filter applies to:

  private def getFilterAttribute(f: Filter): String = {
    f match {
      case EqualTo(attr, v) => attr
      case GreaterThan(attr, v) => attr
      case LessThan(attr, v) => attr
      case GreaterThanOrEqual(attr, v) => attr
      case LessThanOrEqual(attr, v) => attr
      case In(attr, vs) => attr
    }
  }

We also need a utility method for splitting out the primary key range filter -- notice it knows the name of the primary key attribute, but it could easily be generalized. It also knows that only four of the six kinds of supported filters are relevant, and leaves the others alone.

  private def splitKeyFilter: (Option[Int], Option[Int], Array[Filter]) = {
    val keyFilters = allAttrToFilters.getOrElse("val", new Array[Filter](0))
    var min: Option[Int] = None
    var max: Option[Int] = None
    val others = new ArrayBuffer[Filter](0)
    keyFilters.foreach({
      case GreaterThan(attr, v) => min = Some(v.asInstanceOf[Int] + 1)
      case LessThan(attr, v) => max = Some(v.asInstanceOf[Int] - 1)
      case GreaterThanOrEqual(attr, v) => min = Some(v.asInstanceOf[Int])
      case LessThanOrEqual(attr, v) => max = Some(v.asInstanceOf[Int])
      case _ => others.++=: _
    })
    (min, max, others.toArray)
  }

Finally, we need a way to apply the filters to a row, which relies on a helper method for applying the filters relevant to a single attribute to the value of that attribute.

  def apply(r: Map[String, Int]): Boolean = {
    r.forall({
      case (attr, v) => {
        val filters = attrToFilters.getOrElse(attr, new Array[Filter](0))
        satisfiesAll(v, filters)
      }
    })
  }

  private def satisfiesAll(value: Int, filters: Array[Filter]): Boolean = {
    filters.forall({
      case EqualTo(attr, v) => value == v.asInstanceOf[Int]
      case GreaterThan(attr, v) => value > v.asInstanceOf[Int]
      case LessThan(attr, v) => value < v.asInstanceOf[Int]
      case GreaterThanOrEqual(attr, v) => value >= v.asInstanceOf[Int]
      case LessThanOrEqual(attr, v) => value <= v.asInstanceOf[Int]
      case In(attr, vs) => vs.exists(v => value == v.asInstanceOf[Int])
    })
  }
}

Dealing with requiredColumns

Now that Spark SQL is able to specify which columns it wants returned, the situation is a little more tricky than it may seem at first. When buildScan was returning the entire Row, Spark SQL could use the information it obtained from calling schema to understand each Row. When it requests only a subset of columns, it's not only essential that we return only the data from the requested columns in each row, but also that we return them in the order in which they were requested, which may well be (and frequently is) different from the order in which those columns were returned from schema. You'll see a solution to this in the methods makeMap() and projectAndWrapRow() below.

Putting it all together

The core of our new implementation is the class MyPFTableScan which this time extends PrunedFilteredScan and wraps our primitive database RangeDB above:

case class MyPFTableScan(count: Int, partitions: Int)
                      (@transient val sqlContext: SQLContext)
  extends PrunedFilteredScan {

  val db = new RangeDB(count)

We specify the schema by overriding schema as before.

  val schema: StructType = StructType(Seq(
    StructField("val", IntegerType, nullable = false),
    StructField("squared", IntegerType, nullable = false),
    StructField("cubed", IntegerType, nullable = false)
  ))

Now we need two convenience methods for constructing result rows. The first takes a result and converts it to a more flexible representation based on a Map, and the second projects out unneeded columns and puts the required ones in the right order as specified by the requiredColumns parameter.

  private def makeMap(rec: RangeDBRecord): Map[String, Int] = {
    val m = new HashMap[String, Int]()
    m += ("val" -> rec.key)
    m += ("squared" -> rec.squared)
    m += ("cubed" -> rec.cubed)
    m.toMap
  }

  private def projectAndWrapRow(m: Map[String, Int],
                                requiredColumns: Array[String]): Row = {
    val l = requiredColumns.map(c => m(c))
    val r = Row.fromSeq(l)
    r
  }

Finally, we put it all together in our more complex override of the buildScan method where we instantiate our FilterInterpreter from the filters passed in by Spark SQL, use the primary key bounds pulled out of it to query our primitive data engine, and iterate through the records returned, dealing with each record by:

  1. Converting it to a Map
  2. Applying the remaining filters
  3. Projecting out unwanted columns and wrapping the remainder in the right order.

Then we turn all the columns into an RDD.

  def buildScan(requiredColumns: Array[String], 
                filters: Array[Filter]): RDD[Row] = {
    val filterInterpreter = new FilterInterpreter(filters)
    val rowIterator = 
      db.getRecords(filterInterpreter.min, filterInterpreter.max)
    val rows = rowIterator
      .map(rec => makeMap(rec))
      .filter(r => filterInterpreter.apply(r))
      .map(r => projectAndWrapRow(r, requiredColumns))
    sqlContext.sparkContext.parallelize(rows.toSeq, partitions)
  }

}

As before, we need to extend the RelationProvider trait to produce the class we'll actually register with Spark SQL.

class CustomPFRP extends RelationProvider {

  def createRelation(sqlContext: SQLContext, 
                     parameters: Map[String, String]) = {
    MyPFTableScan(parameters("rows").toInt,
      parameters("partitions").toInt)(sqlContext)
  }

}

Registration

The process for registering our external data source is also the same as it was in the previous post.

 sqlContext.sql(
      s"""
        |CREATE TEMPORARY TABLE dataTable
        |USING sql.CustomPFRP
        |OPTIONS (partitions '9', rows '50')
      """.stripMargin)

Querying

Here are a few queries that exercise the new external data source. I recommend stepping through the code we've discussed using a debugger to see how Spark SQL actually calls it. If the parameters passed to buildScan() are surprising you should always remind yourself of the following:

  1. Spark SQL is not relying on the external data source adapter, or the external data engine to do the filtering, it is simply giving them the opportunity to do so. But that means it must request from the data source not only the data that must be returned by the query, but also all the data needed to evaluate the filters.
  2. Spark SQL is not obliged to pass in all the filters it could pass in.
  3. The external data source API allows Spark SQL to send a conjunction of simple filters. If the filter is not conjunctive, Spark SQL will have to evaluate all or most of it by itself.

The first example will request only two columns and pass in a single filter.

val data =
      sqlContext.sql(
        s"""
          |SELECT val, cubed
          |FROM dataTable
          |WHERE val <= 40 
          |ORDER BY val
        """.stripMargin)
    data.foreach(println)

This one will request three columns (so that Spark SQL can re-apply the filters) and pass in two filters.

val data =
      sqlContext.sql(
        s"""
          |SELECT val, cubed
          |FROM dataTable
          |WHERE val <= 40 AND squared >= 900
          |ORDER BY val
        """.stripMargin)
    data.foreach(println)

The third example involves a disjunctive filter, so Spark SQL will not pass any filters to the external data source adapter.

val data =
      sqlContext.sql(
        s"""
          |SELECT val, cubed
          |FROM dataTable
          |WHERE val <= 10 OR squared >= 900
          |ORDER BY val
        """.stripMargin)
    data.foreach(println)

Non-rectangular data

Since Spark SQL can handle non-rectangular data quite elegantly, it's natural to wonder about the extent of support for it in external data sources. For example, what if we wanted to collect the "squared" and "cubed" columns together into a structure called "data". It's pretty clear how we could define the schema in our PruneFilteredScan override:

val schema: StructType = StructType(Seq(
    StructField("val", IntegerType, nullable = false),
    StructField("data", StructType(Seq(
      StructField("squared", IntegerType, nullable = false),
      StructField("cubed", IntegerType, nullable = false)
    )))
    ))

Then the query could look like this:

val data =
      sqlContext.sql(
        s"""
          |SELECT val, data.cubed
          |FROM dataTable
          |WHERE val <= 40 AND data.squared >= 900
          |ORDER BY val
        """.stripMargin)

Then to make it all work, at the very least you'd need to construct the nested row correctly. Ignoring projection, you'd need to start with something like:

val row = Row(m("val"), Row(m("squared"), m("cubed")))

But obviously we'd have to adjust the way we process the query: how we handle projection and filtering. How will Spark SQL express what it needs us to return the "squared" sub-field of "data", but filter on the "cubed" sub-field? Actually, it doesn't. In the call to buildScan() it simply asks for the "data" column whenever any of the sub-fields appears in the SELECT, and just pulls the column apart for itself. Perhaps more disturbing is the fact that Spark SQL doesn't pass along filters for any of the sub-fields either, preferring to apply those filters itself, so our external data engine doesn't get the chance to take advantage of them at all.

All this just means that support for non-rectangular external data sources is present in Spark SQL as of Spark 1.2.0, and it works, but it doesn't really support sophisticated non-rectangular sources with any efficiency for the time being.

Related posts

For a more modern exploration of developing Spark external data sources (as of Spark 2.3.0) see Revisiting Spark External Data Sources (The V2 APIs).

Sunday, December 21, 2014

External Data Sources in Spark 1.2.0

One of the most exciting aspects of the recent Spark 1.2.0 release is the Spark SQL API for external data sources. This is an API for mounting external data sources as temporary tables, which can then be queried through SQL. In this post we'll look at how you can define your own extremely simple external data source and query it. (Edit: In a followup post I've delved into what it takes to push filtering and projection into your external data source.)

As with other posts in this series, I'm assuming familiarity with Scala, and the code for the examples can be found at https://github.com/spirom/LearningSpark -- in this case sql/CustomRelationProvider.scala.

The external data source API is located in the org.apache.spark.sql package, and consists of six abstract classes, six case classes and one trait. However, to get started with a simple integration project all you need to understand is the trait RelationProvider and the abstract class TableScan.

In order to define the classes needed you'll need the following imports:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.sources.{TableScan, RelationProvider}

To keep this example simple, I won't involve any actual use of an external system. Instead I'll show how to create a table of synthetic data. Most of the work involves extending the abstract class TableScan, to provide (a) the table schema and (b) the row data. In this example, the table contains just three columns. The first is a unique integer and the second and third columns are that integer squared and cubed, respectively.

To understand this implementation you need to understand two aspects of Spark SQL that, mercifully, have recently been documented: StructType (and other type constructors) and Row in the org.apache.spark.sql package. In the Spark SQL Programming Guide, see the entries on "Programmatically Specifying the Schema" and "Spark SQL DataType Reference."

case class MyTableScan(count: Int, partitions: Int)
                      (@transient val sqlContext: SQLContext) 
  extends TableScan 
{

  val schema: StructType = StructType(Seq(
    StructField("val", IntegerType, nullable = false),
    StructField("squared", IntegerType, nullable = false),
    StructField("cubed", IntegerType, nullable = false)
  ))

  private def makeRow(i: Int): Row = Row(i, i*i, i*i*i)

  def buildScan: RDD[Row] = {
    val values = (1 to count).map(i => makeRow(i))
    sqlContext.sparkContext.parallelize(values, partitions)
  }
  
}

Now you need to extend the RelatoinProvider trait to provide the DDL interface for your data source. A data source is configured via a set of options, each of which is a key/value pair. By implementing the createRelation method we specify how to use these configuration parameters and the SQLContext to initialize an individual table. In the example, the only parameters are the number of rows in the table and the number of partitions for the resulting RDD.

class CustomRP extends RelationProvider {

  def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {
    MyTableScan(parameters("rows").toInt,
      parameters("partitions").toInt)(sqlContext)
  }
  
}

Now you need to register a temporary table based on your external source, using a new Spark SQL DDL command CREATE TEMPORARY TABLE. You need to provide the table name, the class that defines it (the one implementing RelationProvider) and the options list. Be very careful with your DDL syntax here, and if you get into trouble remember to check Spark's DEBUG logs. If Spark SQL can't parse your DDL, it will hide the detailed and informative error message in a log entry, try re-parsing your DDL as DML, and show you only the latter error message.

.
sqlContext.sql(
      s"""
        |CREATE TEMPORARY TABLE dataTable
        |USING sql.CustomRP
        |OPTIONS (partitions '9', rows '50')
      """.stripMargin)

Finally, you can query your temporary table like any other, referring to the attributes you specified as the "schema" when extending TableScan.

val data = sqlContext.sql("SELECT * FROM dataTable ORDER BY val")

That's all you need for a very simple external data source -- all that's left is write the code to interface to the "external" system of your choice. Keep in mind that the approach I've shown here will always fetch all the columns and all the rows -- if your SQL query only requires, say, a few rows and a few columns, they'll be materialized and discarded by the rest of the query plan. Most of the rest of the external data source API is for dealing with this problem, but that's for a different post.