Sunday, December 21, 2014

External Data Sources in Spark 1.2.0

One of the most exciting aspects of the recent Spark 1.2.0 release is the Spark SQL API for external data sources. This is an API for mounting external data sources as temporary tables, which can then be queried through SQL. In this post we'll look at how you can define your own extremely simple external data source and query it. (Edit: In a followup post I've delved into what it takes to push filtering and projection into your external data source.)

As with other posts in this series, I'm assuming familiarity with Scala, and the code for the examples can be found at https://github.com/spirom/LearningSpark -- in this case sql/CustomRelationProvider.scala.

The external data source API is located in the org.apache.spark.sql package, and consists of six abstract classes, six case classes and one trait. However, to get started with a simple integration project all you need to understand is the trait RelationProvider and the abstract class TableScan.

In order to define the classes needed you'll need the following imports:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.sources.{TableScan, RelationProvider}

To keep this example simple, I won't involve any actual use of an external system. Instead I'll show how to create a table of synthetic data. Most of the work involves extending the abstract class TableScan, to provide (a) the table schema and (b) the row data. In this example, the table contains just three columns. The first is a unique integer and the second and third columns are that integer squared and cubed, respectively.

To understand this implementation you need to understand two aspects of Spark SQL that, mercifully, have recently been documented: StructType (and other type constructors) and Row in the org.apache.spark.sql package. In the Spark SQL Programming Guide, see the entries on "Programmatically Specifying the Schema" and "Spark SQL DataType Reference."

case class MyTableScan(count: Int, partitions: Int)
                      (@transient val sqlContext: SQLContext) 
  extends TableScan 
{

  val schema: StructType = StructType(Seq(
    StructField("val", IntegerType, nullable = false),
    StructField("squared", IntegerType, nullable = false),
    StructField("cubed", IntegerType, nullable = false)
  ))

  private def makeRow(i: Int): Row = Row(i, i*i, i*i*i)

  def buildScan: RDD[Row] = {
    val values = (1 to count).map(i => makeRow(i))
    sqlContext.sparkContext.parallelize(values, partitions)
  }
  
}

Now you need to extend the RelatoinProvider trait to provide the DDL interface for your data source. A data source is configured via a set of options, each of which is a key/value pair. By implementing the createRelation method we specify how to use these configuration parameters and the SQLContext to initialize an individual table. In the example, the only parameters are the number of rows in the table and the number of partitions for the resulting RDD.

class CustomRP extends RelationProvider {

  def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {
    MyTableScan(parameters("rows").toInt,
      parameters("partitions").toInt)(sqlContext)
  }
  
}

Now you need to register a temporary table based on your external source, using a new Spark SQL DDL command CREATE TEMPORARY TABLE. You need to provide the table name, the class that defines it (the one implementing RelationProvider) and the options list. Be very careful with your DDL syntax here, and if you get into trouble remember to check Spark's DEBUG logs. If Spark SQL can't parse your DDL, it will hide the detailed and informative error message in a log entry, try re-parsing your DDL as DML, and show you only the latter error message.

.
sqlContext.sql(
      s"""
        |CREATE TEMPORARY TABLE dataTable
        |USING sql.CustomRP
        |OPTIONS (partitions '9', rows '50')
      """.stripMargin)

Finally, you can query your temporary table like any other, referring to the attributes you specified as the "schema" when extending TableScan.

val data = sqlContext.sql("SELECT * FROM dataTable ORDER BY val")

That's all you need for a very simple external data source -- all that's left is write the code to interface to the "external" system of your choice. Keep in mind that the approach I've shown here will always fetch all the columns and all the rows -- if your SQL query only requires, say, a few rows and a few columns, they'll be materialized and discarded by the rest of the query plan. Most of the rest of the external data source API is for dealing with this problem, but that's for a different post.

6 comments:

  1. Hello,
    Thanks for your post. One question I have is -
    Say I have 3 datasources which have a Column BC. I want to create paritions such that, data from all the datasources which have same value for column bc , is loaded into same machine. Any suggestion is most welcome.

    ReplyDelete
    Replies
    1. Rajesh, sorry I missed your comment. Can you tell me more about your problem? Are you using an existing data source or developing your own. Are all three for the same external system? How is the data stored? Is it already partitioned on this column in the way you want? I'll be happy to try to help think through a solution.

      Delete
  2. Hello, Lets say I have 3 datasources, I want to load them based on a query with where . I want to do a join on these 3 data-sources, so the best way will be , if i can somehow ensure that data from all 3 datasources is partitioned in such a way, that a given worker contains the 3 datasources with matching keys. How can I ensure this.

    ReplyDelete
    Replies
    1. Hi Rajesh, there are several possibilities for achieving this, but which ones are applicable depends on the answers to the questions I asked you in my reply (March 11) to your original comment (March 7). Without that information, your only option is to re-partition the RDDs _after_ they've been loaded, perhaps using a `RangePartitioner` or `HashPartitioner`, or implementing a custom `Partitioner`. This can get messy when you're trying to use Spark SQL as you'll have to re-register the resulting RDDs as temporary tables, and it can also be slow, depending on the sizes. If you answer my questions about the external data source I may be able to give you a better option.

      Delete
    2. Are all three for the same external system?
      Consider one tbl is in oracle, one is sql server , and 3rd is a file
      How is the data stored?
      Data is stored as a simple table. All the tables contains two columns (say colA and colB)
      Is it already partitioned on this column in the way you want?
      What i want is , that data is queried from 3 tables in such a way, that the a given worker contains same value for colA and colB.
      So the join is more efficent.

      Delete
  3. This comment has been removed by the author.

    ReplyDelete