Wednesday, February 18, 2015

Spark SQL Integration for MongoDB

In a previous post I described a native Spark connector for MongoDB (NSMC) and showed how to use it. That post described how to create RDDs where each of the elements were MongoDB's DBObject records. This time, I'll describe how the connector integrates with Spark SQL via Spark 1.2.0's external data source API. Since Spark SQL incorporates support for both nested record structures and arrays, it is naturally a good match for the rather free wheeling schema of MongoDB collections. As before you can find the code on GitHub, use the library in your Scala code via sbt, and look at usage examples in a separate GitHub project. Note that Spark SQL integration was introduced, together with support for Spark 1.2.0, in release 0.4.0 of NSMC.

Related posts

A number of followup posts take this work further:

  1. In Efficient Spark SQL Queries to MongoDB I describe how to make the Spark SQL integration more efficient, and
  2. in JDBC Access to MongoDB via Apache Spark I describe how to use NSMC's Spark SQL integration via JDBC.

Querying JSON from Spark SQL

Many of the complexities of querying MongoDB from Spark SQL are similar to the complexities of querying JSON from Spark SQL, a capabitity that Spark SQL provides out of the box, so let's begin by taking some time to understand that mechanism. The main source of difficulty is that, on the one hand, MongoDB collections do not have a schema (or they have a de facto schema based on their current contents) while on the other hand Spark SQL depends on infering a well defined schema for the data being queried. More specifically:

  1. Different documents in a JSON collection frequently have differing structure, at the top level or in nested documents.
  2. Document fields can have array types, but elements of the same array may have different structures.
  3. The simplest differences in document structure are when one or more fields are present in some documents or array members but absent in others -- these are reconciled by treating the field as nullable.
  4. Atomic type conflicts can be easy to reconcile. For example, when a field is an Int32 in one document and an Int64 in another, it's simple enough to "relax" the type to Int64. It's a little more problematic when one instance is a string and the other is an integer. But the most problematic conflicts are structural: a scalar vs. array, scalar vs. document, or document vs. array.

Keeping this in mind, let's look at how Spark SQL's JSON support deals with infering schema, in a range of simple and not-so-simple examples. You can see the sample code for these at the LearningSpark project on GitHub. The first one is very straightforward, with scalars, nested documents, an array of scalars and an array of documents. It might be tempting to conclude that with a single document there are no opportunities for conflicts, but that's not true: multiple elements of a single array can conflict with each other -- although in this case they don't.

{"a": 1, 
 "b": 2, 
 "c": ["x", "y"], 
 "d": [{"e": 1, "f": 2}, {"e": 11, "f": 12}], 
 "g": {"h": 7}}   

There shouldn't be any surprises in the schema, except perhaps for the fact that Spark SQL makes the fields all nullable, perhaps needlessly in this simple case.

root
 |-- a: integer (nullable = true)
 |-- b: integer (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- d: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- e: integer (nullable = true)
 |    |    |-- f: integer (nullable = true)
 |-- g: struct (nullable = true)
 |    |-- h: integer (nullable = true)

The next example introduces minor conflicts by adding a second document and leaving out certain fields -- "b" and "g" only appear in the first document, and "d.u" only appears in the second document.

{"a": 1, 
 "b": 2, 
 "c": ["x", "y"], 
 "d": [{"e": 1, "f": 2}, {"e": 11, "f": 12}], 
 "g": {"h": 7}}
{"a": 1, 
 "c": ["x", "y"], 
 "d": [{"e": 1, "f": 2, "u":"ewe"}, {"e": 11, "f": 12}]}

The schema is simply the "union" of the two schemas. That's OK since each field is nullable.

root
 |-- a: integer (nullable = true)
 |-- b: integer (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- d: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- e: integer (nullable = true)
 |    |    |-- f: integer (nullable = true)
 |    |    |-- u: string (nullable = true)
 |-- g: struct (nullable = true)
 |    |-- h: integer (nullable = true)

The last example introduces more interesting conflicts. Field "a" is an integer in one document and a long in the other, "b" is an integer in one and an array in the other, "c" is an array in one and a scalar in the other, "d" is a document in one and an array of documents in the other, and g is a document in one and an array of integers in the other.

{"a": 1, 
 "b": 2, 
 "c": ["x", "y"], 
 "d": {"e": 1, "f": 2}, 
 "g": {"h": 7}}
{"a": 5000000000, 
 "b": "two", 
 "c": 12, 
 "d": [{"e": 1, "f": 2, "u":"ewe"}, {"e": 11, "f": 12}], 
 "g": [1, 2]}

Looking at the inferred schema, the treatment of "a" is quite simple, since an integer can be stored in a long. Every other conflict is dealt with by treating the field as a string.

root
 |-- a: long (nullable = true)
 |-- b: string (nullable = true)
 |-- c: string (nullable = true)
 |-- d: string (nullable = true)
 |-- g: string (nullable = true)

This "reversion" to strings has some interesting consequences. The fields can be used in a query, say in a SELECT clause, so you can obtain and manipulate the values. The values returned, however, are NOT strings. For example, if you SELECT g you will obtain a Row containing a single integer from the first document and a list of integers from the other. And, most importantly, you can't query sub-fields -- so you can't SELECT g.h or SELECT c[1], either of which would have been fine against the previous schema. I'm willing to accept this as a good set of compromises, and having NSMC's schema inference behave analogously seems entirely appropriate.

Integrating NSMC with Spark SQL

In two earlier posts I described how to integrate an external database engine with Spark SQL using the external data source API introduced in Spark 1.2.0. The first post described a simple and fairly naive integration approach where the external engine returned all the data in a table or collection, and Spark SQL took care of all filtering and projection. The second post showed how to push filtering and projection down to the external database engine. However, the second post also showed how the Spark SQL query compiler makes limited use of projection pushdown for queries against non-rectangular data. Furthermore, NSMC currently has no ability to push filtering out to MongoDB. As a result, I've opted for the simpler approach thus far. At some point it will make sense to revisit this decision in order to make the connector more efficient.

Schema Inference Issues

Spark SQL integration raised a number of design issues. Some of them may deserve their own posts in the future, but for now here's a summary of what the issues are and how I've resolved them (or, in many cases, found a "stop gap" solution).

  1. Sampling issues: reading an entire data collection to infer a schema is expensive, although sometimes necessary. There are a number of interesting decisions to make in this regard, and in a future release some or all of them may become configurable.
    • How many documents are sampled: currently it's all of them.
    • When to sample: currently they are sampled immediately when the temporary table is registered with Spark SQL.
    • How often to sample: currently they are only sampled once on registration, and then several queries may execute against the result.
  2. Type mapping and coverage: MongoDB supports lots of built in types. For now, NSMC's Spark SQL integration supports only integers, longs, strings, booleans, doubles, binary, time stamps and dates, as well as arrays and nested documents.
  3. Structure field ordering and merging: while the fields of JSON documents are nominally unordered, NSMC sorts them by name, as does Spark SQL's built in JSON support.
  4. Handling atomic or structural type conflicts: these are not currently resolved by NSMC.

Configuration and Use

Perhaps the best way to learn how to use NSMC's Spark SQL integration is to look at the spark-mongodb-examples project on GitHub. The README.md there describes how to set up a sample MongoDB collection and the SQLQuery.scala file shows how to configure an appropriate SQLContext, register a MongoDB collection as a temporary table with Spark SQL, and then execute Spark SQL queries against it. We'll review the example code below.

In your build.sbt you'll need the following dependencies.

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.2.0"

libraryDependencies += 
    "com.github.spirom" %% "spark-mongodb-connector" % "0.4.0"

Only a few imports are needed.

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}   
  

The SparkContext is configured in the same way as when using the core connector, and then a SQLContext is wrapped around it.

val coreConf =
  new SparkConf()
    .setAppName("MongoReader").setMaster("local[4]")
    .set("nsmc.connection.host", DBConfig.host)
    .set("nsmc.connection.port", DBConfig.port)
val conf = (DBConfig.userName, DBConfig.password) match {
  case (Some(u), Some(pw)) => 
    coreConf.set("nsmc.user", u).set("nsmc.password", pw)
  case _ => coreConf
}
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

The MongoDB collection is registered as a temporary table -- you'll need to change the table name as well as the MongoDB database and collection names in your applications, but note that you must tell Spark SQL that you're registering the table via NSMC by referring to nsmc.sql.MongoRelationProvider.

sqlContext.sql(
  s"""
  |CREATE TEMPORARY TABLE dataTable
  |USING nsmc.sql.MongoRelationProvider
  |OPTIONS (db '${DBConfig.testDatabase}', 
  |         collection '${DBConfig.testCollection}')
""".stripMargin)

Now you can run your first query. It's useful to make the first one a SELECT * query so that you can see the entire Spark SQL schema that has been inferred for the sample collection.

val data =
  sqlContext.sql("SELECT * FROM dataTable")

We'll look at the schemas for the results of the sample queries in the next section. You can print the schema by first obtaining it via SchemaRDD's schema method, and then calling printTreeString().

data.schema.printTreeString()

Finally, you can access the actual data returned from the query.

data.foreach(println)

Sample Data

Before we look at the four queries provided in the sample code, let's see what data we're querying from the MongoDB perspective using JSON notation. This is the data inserted by running the main() method of the PopulateTestCollection object in the sample project.

{ 
    "_id" : ObjectId("54e1586080ce44e72e07fe4c"), 
    "custid" : "1001", 
    "billingAddress" : {
        "state" : "NV"
    }, 
    "orders" : [
        {
            "orderid" : "1000001", 
            "itemid" : "A001", 
            "quantity" : NumberInt(175), 
            "returned" : true
        }, 
        {
            "orderid" : "1000002", 
            "itemid" : "A002", 
            "quantity" : NumberInt(20)
        }
    ]
}
{ 
    "_id" : ObjectId("54e1586080ce44e72e07fe4d"), 
    "custid" : "1002", 
    "billingAddress" : {
        "state" : "CA", 
        "zip" : "92093"
    }, 
    "shippingAddress" : {
        "state" : "CA", 
        "zip" : "92109"
    }, 
    "orders" : [
        {
            "orderid" : "1000002", 
            "itemid" : "B012", 
            "quantity" : NumberInt(200)
        }
    ]
}
{ 
    "_id" : ObjectId("54e1586080ce44e72e07fe4e"), 
    "custid" : "1003", 
    "billingAddress" : {
        "state" : "AZ"
    }, 
    "discountCode" : NumberInt(1), 
    "orders" : [
        {
            "orderid" : "1000003", 
            "itemid" : "A001", 
            "quantity" : NumberInt(175)
        }, 
        {
            "orderid" : "1000004", 
            "itemid" : "B001", 
            "quantity" : NumberInt(10)
        }, 
        {
            "orderid" : "1000005", 
            "itemid" : "A060", 
            "quantity" : NumberInt(12)
        }
    ]
}

Notice how "non-rectangular" this data is. Entire nested documents are missing in some documents, scalar fields are missing in others, and the arrays are of different lengths.

Example queries

Now let's look at the queries in the sample project. To see how NSMC has interpreted the sample collection, it's useful to start with a query that returns all of it.

 SELECT * FROM dataTable

As you might hope, the inferred schema is general enough to account for every document in the collection. This is achieved by making every field nullable -- regardless of whether it is missing from any document. At the level of each document or nested document, the fields are in alphabetical order. The types chosen are not surprising, except perhaps for the use of a string to represent the unique ID of each document.

root
 |-- _id: string (nullable = true)
 |-- billingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- custid: string (nullable = true)
 |-- discountCode: integer (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemid: string (nullable = true)
 |    |    |-- orderid: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |-- shippingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)

A couple of things are worth noticing at this stage.

  1. The data returned shows the missing fields as Scala nulls, which are considered by Spark SQL as NULLs, and soon we'll see how to exploit the latter fact in queries.
  2. Each Spark SQL result row is represented using an instance of org.apache.spark.sql.Row, and each nested document by a nested instance of Row. When these are printed they are surrounded by square brackets.
  3. The fields of documents and sub-documents are in the same order as in the inferred schema.
  4. It's hard to tell from this output, but atomic data has the appropriate atomic type: Int, String and so on.
[54e1621480ce1641f05b1caf,
    [NV,89150],
    1001,
    null,
    List([A001,1000001,175], [A002,1000002,20]),
    null]
[54e1621480ce1641f05b1cb0,
    [CA,92093],
    1002,null,
    List([B012,1000002,200]),
    [CA,92109]]
[54e1621480ce1641f05b1cb1,
    [AZ,85014],
    1003,
    1,
    List([A001,1000003,175], [B001,1000004,10], [A060,1000005,12]),
    [AZ,85020]]
[54e1621480ce1641f05b1cb2,
    null,
    1004,
    null,
    null,
    [CA,92109]]

Since the discountCode is often missing, we can see that Spark SQL considers the missing instances to be NULL.

 SELECT custid FROM dataTable 
 WHERE discountCode IS NOT NULL

This query just returns a single column.

root
 |-- custid: string (nullable = true)
  

Only one document has an instance of discountCode.

[1003]

Since the data contains entire sub-documents that are missing, we may be curious about what happens when we use a field inside a sub-document in a query.

 SELECT custid, shippingAddress.zip 
 FROM dataTable

We expect two columns. Notice how the schema shows zip to have been pulled up to the top level.

root
 |-- custid: string (nullable = true)
 |-- zip: string (nullable = true)
  

It turns out not to be a problem: the fields within the sub-document are considered to be NULL when either they are missing or when the entire sub-document is missing. This is also true when we filter by such fields or group by them. Again, zip is indeed at the top level, not nested, and that's what you would expect from the schema.

[1001,null]
[1002,92109]
[1003,85020]
[1004,92109]

Naturally, we'd also like grouping to play well with nested fields.

 SELECT COUNT(custid), shippingAddress.zip 
 FROM dataTable 
 GROUP BY shippingAddress.zip

The schema shouldn't be surprising at this point.

root
 |-- c0: long (nullable = false)
 |-- zip: string (nullable = true)

Neither should the results.

[2,92109]
[1,null]
[1,85020]

Implementation

In subsequent posts I'll describe the implementation of NSMC's Spark SQL integration in some detail. Meanwhile, you can see it on GitHub in the nsmc.sql and nsmc.conversion packages.

Performance and Scalability

In order to infer the schema for a collection, NSMC reads through the entire collection, as I've already described. It materializes the entire collection in a MongoRDD, which was a good way to get something working quickly, but is wasteful of memory, and often also of time. Release 0.4.0 is a "proof of concept" with respect to Spark SQL integration -- improved performance will have to wait for subsequent releases.

Your Experiences

If you use Spark and MongoDB, please try using NSMC -- but remember it's not ready for production use. If you have trouble, feel free to use the comments section here or file an issue on GitHub.

No comments:

Post a Comment