Saturday, May 16, 2015

Efficient Spark SQL Queries to MongoDB

In previous posts I've discussed a native Apache Spark connector for MongoDB (NSMC) and NSMC's integration with Spark SQL. The latter post described a naive implementation of Spark SQL integration, intended to demonstrate its value and the applicability of Spark SQL's recently introduced external data source API. In particular, I had then made no attempt to make the implementation efficient. In this post I will describe how to use some of the features of the external data source API to achieve a much more efficient Spark SQL integration.

General Approach

As discussed in earlier posts, the major challenges in implementing a Spark SQL external data source for MongoDB are:

  1. Efficient schema inference for the entire collection.
  2. Efficient use of MongoDB's query capabilities, based on Spark SQL's projection and filter pushdown mechanism, to obtain the data required for each Spark SQL query.

The NSMC project is hosted on GitHub, and the class nsmc.sql.MongoRelationProvider is a good starting point for reading the Spark SQL integration code.

Schema Inference

The Spark SQL schema for a MongoDB collection is computed when the collection is registered using, say, Spark SQL's CREATE TEMPORARY TABLE command. The goal of schema inference is to generate a Spark SQL schema (Seq[StructField]) to which each document in the collection conforms, so that each document can be converted to an org.apache.spark.sql.Row in order to produce an RDD[Row] as a query result. Some of the subtleties of what it means to infer such a schema were discussed in the original post on Spark SQL integration.

Schema inference itself makes use of Spark's parallel computation features. First, a set of partitions is computed for the collection. Then these are parallelized into an RDD. Next, each partition's collection data is read (in parallel) and a separate schema is computed for each partition. Finally, these schemas are all merged into a single schema that definitively represents the collection. This approach is summarized in Figure 1.

Figure 1: Schema inference

An important characteristic of this algorithm is that the collection is not stored -- the documents are analyzed "on the fly" to compute the schema. Obviously this can be a good thing, if the queries end up depending on small subsets of the collection, or a bad thing if the queries tend to re-scan the entire collection every time.

Collection Scan

A collection is scanned when Spark SQL calls the buildScan() method on the MongoTableScan class, which it does in order to process each query -- this interface was discussed in an earlier post on the external data source API. This method is passed the filters and projections that Spark SQL can benefit from being executed by MongoDB. This information is first used to generate an appropriate MongoDB query as discussed below. That query is executed to obtain an RDD[DBObject] (the representation of a document in the MongoDB APIs), which is then converted to an RDD[Row] using the schema information computed previously. This means that the conversion, like schema inference, also occurs in parallel, although the performance benefit of this is dependent on how skewed the query results are.

Figure 2: Table scan

Generating MongoDB queries from the parameters to the buildScan() method is quite straightforward. MongoDB's find() method happens to be conveniently factored for this, as it takes one parameter for filtering and one for projection. I've given a number of examples of how this conversion turns out below.

Example collection and queries

The best way to understand how NSMC's Spark SQL integration works is to look at some example Spark SQL queries together with the generated MongoDB queries.

The collection and its inferred schema

To understand how query evaluation works, we'll look at the following simple set of MongoDB documents:

{ 
  "_id" : ObjectId("554cc53280cecbc6a8579952"), 
   "item" : 1, 
   "quantity" : 5, 
   "price" : 12.5, 
   "category" : "A" 
}
{ 
  "_id" : ObjectId("554cc53280cecbc6a8579953"), 
  "item" : 2, 
  "quantity" : 10, 
  "price" : 12.5, 
  "location" : 
     { 
       "state" : "AZ", 
       "zip" : "85001" 
     }, 
  "category" : "A" 
}
{ 
  "_id" : ObjectId("554cc53280cecbc6a8579954"), 
  "item" : 3, 
  "quantity" : 15, 
  "price" : 12.5, 
  "category" : "B" 
}
{ 
  "_id" : ObjectId("554cc53280cecbc6a8579955"), 
  "item" : 4, 
  "quantity" : 20, 
  "price" : 12.5, 
  "location" : 
     { 
       "state" : "NV", 
       "zip" : "89101" 
     }, 
  "category" : "B" 
}

NSMC infers the following Spark SQL schema for these documents:

root
 |-- _id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item: integer (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: integer (nullable = true)

Filter and Projection Pushdown

As I discussed in an earlier post, Spark SQL's external data source API allows simple cases of filtering and projection to be pushed down to the back end database query, to make use of any indexing capabilities and to minimize the amount of data that has to be transfered back to Spark.

Let's start with the simplest of queries to see how this works, assuming we have registered the above collection with Spark SQL as the table mongoTable.

SELECT * FROM mongoTable

NSMC simply requests all the columns in the inferred schema from MongoDB. Since the always-present _id column is not explicitly suppressed, it will be present in the output. (MongoDB's convention for this field is that it is always returned unless explicitly suppressed.) Including it in this case was a design choice that seems to result in the most intuitive behavior for * queries. The important design choice was actually to include it as a column in the schema. Once that was done, Spark SQL would expect it to be returned in every * query.

Here is the generated MongoDB query, with no filtering and [effectively] no projection.

collection.find( 
  { }, 
  { "category" : 1 , "item" : 1 , "location" : 1 , 
    "price" : 1 , "quantity" : 1 }
)

This query happens to be equivalent to both of the following, but NSMC doesn't recognize the fact:

collection.find( { }, { } )

collection.find( )

A query that explicitly projects columns results in only those being requested from MongoDB.

SELECT item, quantity FROM mongoTable

In this case, Spark SQL would not know how to deal with the _id column even if it was returned, so NSMC explicitly suppresses it to save bandwidth. It's probably not a very important optimization.

collection.find( 
  { }, 
  { "item" : 1 , "quantity" : 1 , "_id" : 0 }
)

Now let's move on to simple filtering.

SELECT item, quantity 
FROM mongoTable 
WHERE quantity > 12

Indeed, the filter is pushed through to the MongoDB query.

collection.find( 
  { "quantity" : { "$gt" : 12}},
  { "item" : 1 , "quantity" : 1 , "_id" : 0}
)

A disjunctive query is not currently handled by NSMC, although the capability to push it through was introduced in Spark 1.3.0.

SELECT item, quantity 
FROM mongoTable 
WHERE quantity > 12 OR quantity < 8

So only projection is pushed through.

collection.find( 
  {},
  { "item" : 1 , "quantity" : 1 , "_id" : 0}
)

Accessing a nested document or array element doesn't disable projection pushdown.

SELECT item, quantity, location.zip 
FROM mongoTable

The projection is "widened" to the top level column, so somewhat too much data is pulled in from MongoDB.

collection.find( 
  {},
  { "item" : 1 , "quantity" : 1 , "location" : 1 , "_id" : 0}
)

Filtering on a nested document or array element is more problematic.

SELECT item, quantity, location.zip 
FROM mongoTable 
WHERE location.state = 'AZ' AND quantity = 10

There's no widening trick to play here, so the problematic conjunct is not pushed through.

collection.find( 
  { "quantity" : 10 }
  { "item" : 1 , "quantity" : 1 , "location" : 1 , "_id" : 0}
)

We should also check an aggregation query.

SELECT category, sum(quantity) FROM mongoTable GROUP BY category

The external data source API doesn't have a way to push grouping and aggregation through, so MongoDB is just asked to fetch the underlying data.

collection.find( 
  { },
  { "category" : 1 , "quantity" : 1 , "_id" : 0 }
)

Scope for Improvement

A number of changes either in NSMC's implementation or in Spark SQL's implementation could result in still more efficient processing of queries to MongoDB.

Sampling

Scanning the entire collection to compute a schema has the advantage of not missing documents that would have added valuable schema information. On the other hand, it can be very expensive. Sampling clearly has to be optional, with a parameter allowing the user to specify how much of the collection to sample.

Narrower Schema Inference

Currently, a schema is always inferred for the entire collection, which can be expensive if queries will only use part of the collection. An obvious refinement would be to allow users to specify a projection when registering the collection. Being able to specify a filter would be even more powerful, but less obviously useful.

Caching

The inferred schema is cached for use by all queries. This has its dangers, which are mitigated by Spark SQL's ability to refresh a registered collection.

Since the results of a query conform to all the standard characteristics of an RDD, they benefit from Spark's caching capabilities. However, it is not yet clear to me how much they benefit across multiple queries. This needs some investigation and experimentation.

Tighter Integration with Catalyst

Spark SQL's external data source API was designed with simplicity and ease of use in mind, and in my opinion it succeeds admirably in this regard. However, the API does provide for deeper (and harder to use) integration with Spark SQL's Catalyst query compiler. It may in the future be possible to improve NSMC's Spark SQL integration by using this feature.

Limitations of the External Data Source API

Spark SQL's external data source API basically can't handle nested structures or arrays. This means certain queries against a MongoDB collection will always require that too much data be returned from MongoDB to Spark. I don't know if this is at all on the radar of the Spark SQL developers.

Additionally, the external data source API does not support pushing joins or grouping and aggregation to the external database.

Integrating with Spark SQL's cost model

The external data source API allows an implementation to provide information about the size of a collection, but NSMC doesn't use this feature. It may allow the Catalyst query optimizer to make better query processing decisions.

Limitations of Catalyst

The Catalyst query optimizer currently has a limited cost model, and improvements in it may be helpful to NSMC's performance.

Improved Partitioning

NSMC's collection partitioning features are currently quite naive, and this may limit the efficiency gains of parallel schema inference and parallel query execution. This needs some experimentation.

Summary

In my original post about NSMC's Spark SQL integration, I described a very naive implementation, intended to show that such integration was possible and could be useful. In this post I've described more recent work to take advantage of Spark's parallelism for schema inference, to prevent schema inference from consuming large amounts of memory, and to take advantage of Spark SQL's filter and projection pushdown features to dramatically improve query performance in many cases. While there's a lot more to be done, these improvements in many cases give NSMC's Spark SQL integration quite realistic performance.

Thursday, May 14, 2015

JDBC Access to MongoDB via Apache Spark

In previous posts I've discussed a native Apache Spark connector for MongoDB (NSMC) and NSMC's integration with Spark SQL. The latter post described an example project that issued Spark SQL queries via Scala code. However, much of the value of Spark SQL integration comes from the possibility of it being used either by pre-existing tools or applications, or by end users who understand SQL but do not, in general, write code. In this post I'll describe how to set up a Spark Thrift server with NSMC, and query it via JDBC calls.

JDBC access to Spark SQL

To make JDBC connections to Spark SQL, you need to run the Spark Thrift server, for which I'll give instructions below. Writing a Java application that connects to the Thrift server requires the HiveServer2 JDBC connector. The link provides some general information together with a list of JARs needed in the CLASSPATH, but I prefer to use Maven dependencies. The example described here works with the following quite conservative dependencies:

<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-jdbc</artifactId>
  <version>0.13.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.2.0</version>
</dependency>

If you prefer to live on the edge, the following dependencies work too:

<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-jdbc</artifactId>
  <version>1.1.0</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.7.0</version>
</dependency>

Running a Spark Thrift server with NSMC

Let's begin by setting up a Spark Thrift server that has access to NSMC.

Prerequisites

In order to follow along with the setup and examples you will need to set up the following:

  1. A running instance of MongoDB.
  2. A running instance of Spark 1.3.1 or later that can communicate with the MongoDB instance.
  3. The NSMC Examples project, having run the main() method of the PopulateTestCollection class to set up the example collection in your MongoDB instance. (While the rest of the example classes need to be submitted as a Spark job, PopulateTestCollection connects directly to your MongoDB instance, and can be run stand-alone.)
  4. The NSMC JDBC Client Samples project, which I'll describe in some detail below.
  5. A configuration file that will be used to tell NSMC how to connect to your MongoDB instance -- we'll call it nsmc.conf.
The required contents of your nsmc.conf file are as follows:
spark.nsmc.connection.host      <your MongoDB host>
spark.nsmc.connection.port      <your MongoDB port>
# omit the next two lines if you're not using MongoDB authentication, 
# otherwise provide an appropriate username and password
spark.nsmc.user                 <your MongoDB user name>
spark.nsmc.password             <your MongoDB password>

Startup

Next you need to start a Spark Thrift server to act as a JDBC listener. You need to tell it:
  1. Where to find your Spark master.
  2. What packages it needs to download from the Maven Central Repository -- in this case a supported version of NSMC, v0.5.2 or later, compiled for Scala 2.10. (Earlier versions of NSMC cannot be used in this way.)
  3. Where to find NSMC configuration -- in this case your nsmc.conf file.
<your Spark installation>/sbin/start-thriftserver.sh \
    --master <your spark master URL> \
    --packages com.github.spirom:spark-mongodb-connector_2.10:0.5.2 \
    --properties-file <path to config files>/nsmc.conf

If you do not want Spark to download NSMC directly from the Maven Central Repository, perhaps because the servers do not have access to the Internet, you can download an assembly containing all the relevant packages -- look for spark-mongodb-connector-assembly-0.5.2.jar or a later version. If you take this approach you need to:

  • Save the assembly somewhere that is accessible on all servers in your Spark installation
  • Skip the --packages setting when starting the Thrift server and instead use --driver-class-path together with the path to where you saved the assembly.
  • Add an entry to your nsmc.conf file where the key is spark.executor.extraClassPath and the value is again the path to where you saved the assembly file.

Running the JDBC Examples

If you have cloned the NSMC JDBC Client Samples project, you can build and run it either through IntelliJ Idea, or through Apache Maven by using the commands:

mvn compile
mvn exec:java -Dexec.mainClass="Demo"

Most of the code is fairly mundane JDBC, and I won't attempt to write a JDBC tutorial. But I will step through some of the most interesting points. You may need to modify the connection string if you're running the Spark Thrift server on a remote host or a non-default port. Furthermore, if you've set up authentication you'll need to provide an appropriate user name and password.

Connection con = 
    DriverManager.getConnection("jdbc:hive2://localhost:10000/default", 
                                "hive", "");

The most important part of the example is registering a MongoDB collection with Spark SQL. There are a number of interesting aspects.

  • You need to register a temporary table, and thus you will need to re-register it if you restart the Spark Thrift server. Spark (as of 1.3.0) now supports persisting metadata for external resources like this in the Hive Metastore, but NSMC (as of v0.5.2) isn't yet capable of taking advantage of it.
  • Because all connections to a Spark Thrift server run in the same HiveContext, registering a MongoDB collection once like this makes it available to all users until the server is restarted.
  • It is the USING clause that tells Spark SQL to use NSMC for querying this resource. Since you told the Spark Thrift server to download an appropriate version of NSMC from the Maven Central Repository, NSMC will both set up the right metadata when a collection is registered, and then also process any queries to this table.
  • You can register several different MongoDB collections, potentially in different databases, through multiple invocations of CREATE TEMPORARY TABLE. Alas, all connections must use the same MongoDB endpoint (host/port) with the same NSMC configuration. See NSMC Issue #15 for the status of the needed enhancement.
CREATE TEMPORARY TABLE dataTable
USING nsmc.sql.MongoRelationProvider
OPTIONS (db 'test', collection 'scratch')

The remaining code executes some simple DDL and DML queries, and should be clear if you've looked at the MongoDB collection and queries in the NSMC Examples project. You will recognize the following Spark SQL schema as being the schema that NSMC infers from the MongoDB collection.

root
 |-- _id: string (nullable = true)
 |-- billingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- custid: string (nullable = true)
 |-- discountCode: integer (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemid: string (nullable = true)
 |    |    |-- orderid: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |-- shippingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)

Since this schema has both arrays and nested structures, I want to spend the rest of this post focusing on how JDBC handles returning query results from such an aggressively non-rectangular (or non-relational) schema. As you have seen in my original Spark SQL post, Spark SQL itself deals with this extraordinarily well, but alas the Hive JDBC connector does not.

The following simple query illustrates the issues, since billingAddress contains a nested document and orders contains an array.

SELECT custid, billingAddress, orders FROM dataTable

If you execute the query from Scala and print the schema of the result you will see:

root
 |-- custid: string (nullable = true)
 |-- billingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemid: string (nullable = true)
 |    |    |-- orderid: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)

And printing the data frame yields:

custid billingAddress orders               
1001   [NV,89150]     List([A001,100000... 
1002   [CA,92093]     List([B012,100000... 
1003   [AZ,85014]     List([A001,100000... 
1004   null           null                 

In JDBC we would expect the billingAddress to be modeled as java.sql.Struct and the orders to be a java.sql.Array (where the elements are also java.sql.Struct). Instead, we see that JDBC recognizes the original types, but transmits all the structured values as JSON strings. The following code can be used to show how JDBC perceives the column types:

ResultSetMetaData rsm = res.getMetaData();
System.out.println("*** Column metadata:");
int ncols = rsm.getColumnCount();
System.out.println("total of " + ncols + " columns");
for (int i = 1; i <= ncols; i++) {
    System.out.println("Column " + i + " : " + rsm.getColumnName(i));
    System.out.println(" Label : " + rsm.getColumnLabel(i));
    System.out.println(" Class Name : " + rsm.getColumnClassName(i));
    System.out.println(" Type : " + rsm.getColumnType(i));
    System.out.println(" Type Name : " + rsm.getColumnTypeName(i));
}

Here is the output:

*** Column metadata:
total of 3 columns
Column 1 : custid
  Label : custid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 2 : billingAddress
  Label : billingAddress
  Class Name : java.lang.String
  Type : 2002
  Type Name : struct
Column 3 : orders
  Label : orders
  Class Name : java.lang.String
  Type : 2003
  Type Name : array

The following code allows us to see the values. Since all of the columns have been converted to strings, it's worth noting that the highlighted code could just as well have been replaced with res.getString(i).

System.out.println("*** Data:");
while (res.next()) {
    System.out.println("Row " + res.getRow());
    for (int i = 1; i <= ncols; i++) {
        System.out.println(" Column " + i + " : " + res.getObject(i));
    }
}

Here is the output. Notice that it is quite readable, and the JSON strings could be parsed by your client code if needed, especially since you can use the ResultSetMetaData to determine what to expect in the String. Whether this is "good enough" really depends on your needs.

*** Data:
Row 1
  Column 1 : 1001
  Column 2 : {"state":"NV","zip":"89150"}
  Column 3 : [{"itemid":"A001","orderid":"1000001","quantity":175},
              {"itemid":"A002","orderid":"1000002","quantity":20}]
Row 2
  Column 1 : 1002
  Column 2 : {"state":"CA","zip":"92093"}
  Column 3 : [{"itemid":"B012","orderid":"1000002","quantity":200}]
Row 3
  Column 1 : 1003
  Column 2 : {"state":"AZ","zip":"85014"}
  Column 3 : [{"itemid":"A001","orderid":"1000003","quantity":175},
              {"itemid":"B001","orderid":"1000004","quantity":10},
              {"itemid":"A060","orderid":"1000005","quantity":12}]
Row 4
  Column 1 : 1004
  Column 2 : null
  Column 3 : null

Advantages of using HiveQL in queries

If Hive's compromise for returning semi-structured data doesn't meet your needs, you may want to consider doing more of the work in the query. For example, Spark SQL supports drilling into nested structures as follows:

SELECT custid, billingAddress.zip FROM dataTable

This actually works remarkably well. For example, if billingAddress happens to be null in some document, asking for billingAddress.zip also returns NULL, instead of generating an error. However, Spark SQL doesn't give you a good way to unpack arrays -- and yet there is a coping strategy for those too.

Requests to the Thrift JDBC server are executed in a HiveContext, so in addition to having access to Spark SQL as a query language, you can write queries in HiveQL. More than just being convenient for experienced Hive users, this allows you to work around some of the shortcomings of the Hive JDBC driver for dealing with the "non-rectangular" aspects of MongoDB collections. For example, the "LATERAL VIEW" feature of HiveQL allows you to effectively "denormalize" the data your query returns:

SELECT custid, o.orderid, o.itemid, o.quantity 
FROM dataTable LATERAL VIEW explode(orders) t AS o

Instead of returning one result row for each matching document, it returns one row for each order, with the custid joined on. The result is completely rectangular, and so is easy to deal with in your client code. First, you can see this in the "Type Name" of the metadata.

*** Column metadata:
total of 4 columns
Column 1 : custid
  Label : custid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 2 : orderid
  Label : orderid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 3 : itemid
  Label : itemid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 4 : quantity
  Label : quantity
  Class Name : java.lang.Integer
  Type : 4
  Type Name : int

Second, you can see it when you print the results.

*** Data:
Row 1
  Column 1 : 1001
  Column 2 : 1000001
  Column 3 : A001
  Column 4 : 175
Row 2
  Column 1 : 1001
  Column 2 : 1000002
  Column 3 : A002
  Column 4 : 20
Row 3
  Column 1 : 1002
  Column 2 : 1000002
  Column 3 : B012
  Column 4 : 200
Row 4
  Column 1 : 1003
  Column 2 : 1000003
  Column 3 : A001
  Column 4 : 175
Row 5
  Column 1 : 1003
  Column 2 : 1000004
  Column 3 : B001
  Column 4 : 10
Row 6
  Column 1 : 1003
  Column 2 : 1000005
  Column 3 : A060
  Column 4 : 12

You can get more information about this technique in the relevant section of the Hive Language Manual.

Summary

Using Apache Spark's Thrift server and NSMC's integration with Spark SQL, you can run highly distributed SQL queries against MongoDB collections, perhaps even performing joins between MongoDB collections and data stored elsewhere.

Wednesday, February 18, 2015

Spark SQL Integration for MongoDB

In a previous post I described a native Spark connector for MongoDB (NSMC) and showed how to use it. That post described how to create RDDs where each of the elements were MongoDB's DBObject records. This time, I'll describe how the connector integrates with Spark SQL via Spark 1.2.0's external data source API. Since Spark SQL incorporates support for both nested record structures and arrays, it is naturally a good match for the rather free wheeling schema of MongoDB collections. As before you can find the code on GitHub, use the library in your Scala code via sbt, and look at usage examples in a separate GitHub project. Note that Spark SQL integration was introduced, together with support for Spark 1.2.0, in release 0.4.0 of NSMC.

Related posts

A number of followup posts take this work further:

  1. In Efficient Spark SQL Queries to MongoDB I describe how to make the Spark SQL integration more efficient, and
  2. in JDBC Access to MongoDB via Apache Spark I describe how to use NSMC's Spark SQL integration via JDBC.

Querying JSON from Spark SQL

Many of the complexities of querying MongoDB from Spark SQL are similar to the complexities of querying JSON from Spark SQL, a capabitity that Spark SQL provides out of the box, so let's begin by taking some time to understand that mechanism. The main source of difficulty is that, on the one hand, MongoDB collections do not have a schema (or they have a de facto schema based on their current contents) while on the other hand Spark SQL depends on infering a well defined schema for the data being queried. More specifically:

  1. Different documents in a JSON collection frequently have differing structure, at the top level or in nested documents.
  2. Document fields can have array types, but elements of the same array may have different structures.
  3. The simplest differences in document structure are when one or more fields are present in some documents or array members but absent in others -- these are reconciled by treating the field as nullable.
  4. Atomic type conflicts can be easy to reconcile. For example, when a field is an Int32 in one document and an Int64 in another, it's simple enough to "relax" the type to Int64. It's a little more problematic when one instance is a string and the other is an integer. But the most problematic conflicts are structural: a scalar vs. array, scalar vs. document, or document vs. array.

Keeping this in mind, let's look at how Spark SQL's JSON support deals with infering schema, in a range of simple and not-so-simple examples. You can see the sample code for these at the LearningSpark project on GitHub. The first one is very straightforward, with scalars, nested documents, an array of scalars and an array of documents. It might be tempting to conclude that with a single document there are no opportunities for conflicts, but that's not true: multiple elements of a single array can conflict with each other -- although in this case they don't.

{"a": 1, 
 "b": 2, 
 "c": ["x", "y"], 
 "d": [{"e": 1, "f": 2}, {"e": 11, "f": 12}], 
 "g": {"h": 7}}   

There shouldn't be any surprises in the schema, except perhaps for the fact that Spark SQL makes the fields all nullable, perhaps needlessly in this simple case.

root
 |-- a: integer (nullable = true)
 |-- b: integer (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- d: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- e: integer (nullable = true)
 |    |    |-- f: integer (nullable = true)
 |-- g: struct (nullable = true)
 |    |-- h: integer (nullable = true)

The next example introduces minor conflicts by adding a second document and leaving out certain fields -- "b" and "g" only appear in the first document, and "d.u" only appears in the second document.

{"a": 1, 
 "b": 2, 
 "c": ["x", "y"], 
 "d": [{"e": 1, "f": 2}, {"e": 11, "f": 12}], 
 "g": {"h": 7}}
{"a": 1, 
 "c": ["x", "y"], 
 "d": [{"e": 1, "f": 2, "u":"ewe"}, {"e": 11, "f": 12}]}

The schema is simply the "union" of the two schemas. That's OK since each field is nullable.

root
 |-- a: integer (nullable = true)
 |-- b: integer (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- d: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- e: integer (nullable = true)
 |    |    |-- f: integer (nullable = true)
 |    |    |-- u: string (nullable = true)
 |-- g: struct (nullable = true)
 |    |-- h: integer (nullable = true)

The last example introduces more interesting conflicts. Field "a" is an integer in one document and a long in the other, "b" is an integer in one and an array in the other, "c" is an array in one and a scalar in the other, "d" is a document in one and an array of documents in the other, and g is a document in one and an array of integers in the other.

{"a": 1, 
 "b": 2, 
 "c": ["x", "y"], 
 "d": {"e": 1, "f": 2}, 
 "g": {"h": 7}}
{"a": 5000000000, 
 "b": "two", 
 "c": 12, 
 "d": [{"e": 1, "f": 2, "u":"ewe"}, {"e": 11, "f": 12}], 
 "g": [1, 2]}

Looking at the inferred schema, the treatment of "a" is quite simple, since an integer can be stored in a long. Every other conflict is dealt with by treating the field as a string.

root
 |-- a: long (nullable = true)
 |-- b: string (nullable = true)
 |-- c: string (nullable = true)
 |-- d: string (nullable = true)
 |-- g: string (nullable = true)

This "reversion" to strings has some interesting consequences. The fields can be used in a query, say in a SELECT clause, so you can obtain and manipulate the values. The values returned, however, are NOT strings. For example, if you SELECT g you will obtain a Row containing a single integer from the first document and a list of integers from the other. And, most importantly, you can't query sub-fields -- so you can't SELECT g.h or SELECT c[1], either of which would have been fine against the previous schema. I'm willing to accept this as a good set of compromises, and having NSMC's schema inference behave analogously seems entirely appropriate.

Integrating NSMC with Spark SQL

In two earlier posts I described how to integrate an external database engine with Spark SQL using the external data source API introduced in Spark 1.2.0. The first post described a simple and fairly naive integration approach where the external engine returned all the data in a table or collection, and Spark SQL took care of all filtering and projection. The second post showed how to push filtering and projection down to the external database engine. However, the second post also showed how the Spark SQL query compiler makes limited use of projection pushdown for queries against non-rectangular data. Furthermore, NSMC currently has no ability to push filtering out to MongoDB. As a result, I've opted for the simpler approach thus far. At some point it will make sense to revisit this decision in order to make the connector more efficient.

Schema Inference Issues

Spark SQL integration raised a number of design issues. Some of them may deserve their own posts in the future, but for now here's a summary of what the issues are and how I've resolved them (or, in many cases, found a "stop gap" solution).

  1. Sampling issues: reading an entire data collection to infer a schema is expensive, although sometimes necessary. There are a number of interesting decisions to make in this regard, and in a future release some or all of them may become configurable.
    • How many documents are sampled: currently it's all of them.
    • When to sample: currently they are sampled immediately when the temporary table is registered with Spark SQL.
    • How often to sample: currently they are only sampled once on registration, and then several queries may execute against the result.
  2. Type mapping and coverage: MongoDB supports lots of built in types. For now, NSMC's Spark SQL integration supports only integers, longs, strings, booleans, doubles, binary, time stamps and dates, as well as arrays and nested documents.
  3. Structure field ordering and merging: while the fields of JSON documents are nominally unordered, NSMC sorts them by name, as does Spark SQL's built in JSON support.
  4. Handling atomic or structural type conflicts: these are not currently resolved by NSMC.

Configuration and Use

Perhaps the best way to learn how to use NSMC's Spark SQL integration is to look at the spark-mongodb-examples project on GitHub. The README.md there describes how to set up a sample MongoDB collection and the SQLQuery.scala file shows how to configure an appropriate SQLContext, register a MongoDB collection as a temporary table with Spark SQL, and then execute Spark SQL queries against it. We'll review the example code below.

In your build.sbt you'll need the following dependencies.

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.2.0"

libraryDependencies += 
    "com.github.spirom" %% "spark-mongodb-connector" % "0.4.0"

Only a few imports are needed.

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}   
  

The SparkContext is configured in the same way as when using the core connector, and then a SQLContext is wrapped around it.

val coreConf =
  new SparkConf()
    .setAppName("MongoReader").setMaster("local[4]")
    .set("nsmc.connection.host", DBConfig.host)
    .set("nsmc.connection.port", DBConfig.port)
val conf = (DBConfig.userName, DBConfig.password) match {
  case (Some(u), Some(pw)) => 
    coreConf.set("nsmc.user", u).set("nsmc.password", pw)
  case _ => coreConf
}
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

The MongoDB collection is registered as a temporary table -- you'll need to change the table name as well as the MongoDB database and collection names in your applications, but note that you must tell Spark SQL that you're registering the table via NSMC by referring to nsmc.sql.MongoRelationProvider.

sqlContext.sql(
  s"""
  |CREATE TEMPORARY TABLE dataTable
  |USING nsmc.sql.MongoRelationProvider
  |OPTIONS (db '${DBConfig.testDatabase}', 
  |         collection '${DBConfig.testCollection}')
""".stripMargin)

Now you can run your first query. It's useful to make the first one a SELECT * query so that you can see the entire Spark SQL schema that has been inferred for the sample collection.

val data =
  sqlContext.sql("SELECT * FROM dataTable")

We'll look at the schemas for the results of the sample queries in the next section. You can print the schema by first obtaining it via SchemaRDD's schema method, and then calling printTreeString().

data.schema.printTreeString()

Finally, you can access the actual data returned from the query.

data.foreach(println)

Sample Data

Before we look at the four queries provided in the sample code, let's see what data we're querying from the MongoDB perspective using JSON notation. This is the data inserted by running the main() method of the PopulateTestCollection object in the sample project.

{ 
    "_id" : ObjectId("54e1586080ce44e72e07fe4c"), 
    "custid" : "1001", 
    "billingAddress" : {
        "state" : "NV"
    }, 
    "orders" : [
        {
            "orderid" : "1000001", 
            "itemid" : "A001", 
            "quantity" : NumberInt(175), 
            "returned" : true
        }, 
        {
            "orderid" : "1000002", 
            "itemid" : "A002", 
            "quantity" : NumberInt(20)
        }
    ]
}
{ 
    "_id" : ObjectId("54e1586080ce44e72e07fe4d"), 
    "custid" : "1002", 
    "billingAddress" : {
        "state" : "CA", 
        "zip" : "92093"
    }, 
    "shippingAddress" : {
        "state" : "CA", 
        "zip" : "92109"
    }, 
    "orders" : [
        {
            "orderid" : "1000002", 
            "itemid" : "B012", 
            "quantity" : NumberInt(200)
        }
    ]
}
{ 
    "_id" : ObjectId("54e1586080ce44e72e07fe4e"), 
    "custid" : "1003", 
    "billingAddress" : {
        "state" : "AZ"
    }, 
    "discountCode" : NumberInt(1), 
    "orders" : [
        {
            "orderid" : "1000003", 
            "itemid" : "A001", 
            "quantity" : NumberInt(175)
        }, 
        {
            "orderid" : "1000004", 
            "itemid" : "B001", 
            "quantity" : NumberInt(10)
        }, 
        {
            "orderid" : "1000005", 
            "itemid" : "A060", 
            "quantity" : NumberInt(12)
        }
    ]
}

Notice how "non-rectangular" this data is. Entire nested documents are missing in some documents, scalar fields are missing in others, and the arrays are of different lengths.

Example queries

Now let's look at the queries in the sample project. To see how NSMC has interpreted the sample collection, it's useful to start with a query that returns all of it.

 SELECT * FROM dataTable

As you might hope, the inferred schema is general enough to account for every document in the collection. This is achieved by making every field nullable -- regardless of whether it is missing from any document. At the level of each document or nested document, the fields are in alphabetical order. The types chosen are not surprising, except perhaps for the use of a string to represent the unique ID of each document.

root
 |-- _id: string (nullable = true)
 |-- billingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- custid: string (nullable = true)
 |-- discountCode: integer (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemid: string (nullable = true)
 |    |    |-- orderid: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |-- shippingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)

A couple of things are worth noticing at this stage.

  1. The data returned shows the missing fields as Scala nulls, which are considered by Spark SQL as NULLs, and soon we'll see how to exploit the latter fact in queries.
  2. Each Spark SQL result row is represented using an instance of org.apache.spark.sql.Row, and each nested document by a nested instance of Row. When these are printed they are surrounded by square brackets.
  3. The fields of documents and sub-documents are in the same order as in the inferred schema.
  4. It's hard to tell from this output, but atomic data has the appropriate atomic type: Int, String and so on.
[54e1621480ce1641f05b1caf,
    [NV,89150],
    1001,
    null,
    List([A001,1000001,175], [A002,1000002,20]),
    null]
[54e1621480ce1641f05b1cb0,
    [CA,92093],
    1002,null,
    List([B012,1000002,200]),
    [CA,92109]]
[54e1621480ce1641f05b1cb1,
    [AZ,85014],
    1003,
    1,
    List([A001,1000003,175], [B001,1000004,10], [A060,1000005,12]),
    [AZ,85020]]
[54e1621480ce1641f05b1cb2,
    null,
    1004,
    null,
    null,
    [CA,92109]]

Since the discountCode is often missing, we can see that Spark SQL considers the missing instances to be NULL.

 SELECT custid FROM dataTable 
 WHERE discountCode IS NOT NULL

This query just returns a single column.

root
 |-- custid: string (nullable = true)
  

Only one document has an instance of discountCode.

[1003]

Since the data contains entire sub-documents that are missing, we may be curious about what happens when we use a field inside a sub-document in a query.

 SELECT custid, shippingAddress.zip 
 FROM dataTable

We expect two columns. Notice how the schema shows zip to have been pulled up to the top level.

root
 |-- custid: string (nullable = true)
 |-- zip: string (nullable = true)
  

It turns out not to be a problem: the fields within the sub-document are considered to be NULL when either they are missing or when the entire sub-document is missing. This is also true when we filter by such fields or group by them. Again, zip is indeed at the top level, not nested, and that's what you would expect from the schema.

[1001,null]
[1002,92109]
[1003,85020]
[1004,92109]

Naturally, we'd also like grouping to play well with nested fields.

 SELECT COUNT(custid), shippingAddress.zip 
 FROM dataTable 
 GROUP BY shippingAddress.zip

The schema shouldn't be surprising at this point.

root
 |-- c0: long (nullable = false)
 |-- zip: string (nullable = true)

Neither should the results.

[2,92109]
[1,null]
[1,85020]

Implementation

In subsequent posts I'll describe the implementation of NSMC's Spark SQL integration in some detail. Meanwhile, you can see it on GitHub in the nsmc.sql and nsmc.conversion packages.

Performance and Scalability

In order to infer the schema for a collection, NSMC reads through the entire collection, as I've already described. It materializes the entire collection in a MongoRDD, which was a good way to get something working quickly, but is wasteful of memory, and often also of time. Release 0.4.0 is a "proof of concept" with respect to Spark SQL integration -- improved performance will have to wait for subsequent releases.

Your Experiences

If you use Spark and MongoDB, please try using NSMC -- but remember it's not ready for production use. If you have trouble, feel free to use the comments section here or file an issue on GitHub.

Sunday, January 25, 2015

NSMC: A Native MongoDB Connector for Apache Spark

Both core Spark and Spark SQL provide ways to neatly plug in external database engines as a source of data. In this post I'm going to describe an experimental MongoDB connector for core Spark, called NSMC (for "Native Spark MongoDB Connector"). It's a "native" connector in the sense that it connects Spark directly to MongoDB, without involving anything else like Hadoop. For me, this was largely an exercise in learning how to integrate an external data engine with core Spark, and more generally, an exercise in learning about Spark, but over time it became about a number of other things, including MongoDB, distributed databases and semi-structured data. I hope to delve into the various areas of learning in future posts, but here I'll simply introduce the project and describe how you can experiment with it.

MongoDB makes an interesting case study as an external Spark data source for a number of reasons:

  1. It has become the most popular DBMS in a number of "non-traditional" categories, including non-relational and NoSQL, being one of the few remaining systems where the "NoSQL" label still means something.
  2. The data model, based on collections of JSON-like documents, is both deeply ad-hoc (i.e., collections have no a priori schema whatsoever) and deeply non-rectangular, making it an interesting and (as it turns out) challenging test case for integration with virtually any system.
  3. It is not directly served by Spark, although it can be used from Spark via an official Hadoop connector, and Spark SQL also provides indirect support via its support for reading and writing JSON text files.

NSMC is hosted on GitHub under an Apache 2.0 license, but I'm not going to discuss the implementation in this post at all.

Related posts

A number of followup posts take this work further:
  1. In Spark SQL Integration for MongoDB I discuss how NSMC can now be used from Spark SQL.
  2. In Efficient Spark SQL Queries to MongoDB I describe how to make the Spark SQL integration more efficient, and
  3. in JDBC Access to MongoDB via Apache Spark I describe how to use NSMC's Spark SQL integration via JDBC.

Trying it out

To try the connector out in your system you need a Spark 1.1.0 instance and a MongoDB instance (clustered or not.) You can find a complete example to play with on GitHub. Your Spark code will need to be written in Scala, as part of an SBT project, and you need to include the following in your build.sbt file.

scalaVersion := "2.10.4" // any 2.10 is OK 

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.0"

libraryDependencies += "com.github.spirom" %% "spark-mongodb-connector" % "0.3.0"

Note that the connector itself depends on Spark 1.1.0 and Casbah. The connector API is in the nsmc namespace. You'll also need access to Casbah's import of DBObject class.

import nsmc._

import com.mongodb.casbah.Imports._

Then you need to configure a SparkContext in the usual way, and add some extra configuration to enable the connector to communicate with MongoDB. In this code snippet, and others below, the lines you need to add or change are highlighted.

val conf = new SparkConf()
    .setAppName("My MongoApp").setMaster("local[4]") // or whatever
    .set("nsmc.connection.host", "myMongoHost")
    .set("nsmc.connection.port", "myMongoPort")
    .set("nsmc.user", "yourUsernameHere")
    .set("nsmc.password", "yourPasswordHere")
val sc = newSparkContext(conf)

Finally, you can call the mongoCollection() method on the context with the names of your favourite MongoDB database and collection.

val data = sc.mongoCollection[DBObject]("myDB", "myCollection")

Then the value of data will be an RDD[DBObject], and you deal with it the way you would deal with any RDD, and with the elements like any Casbah DBObject. This means that you're unlikely to get very far unless you already know how to read from MongoDB using Casbah -- in fact this may be a good time to learn the Casbah API by writing a simple Scala example that doesn't involve Spark at all.

As you might expect if you are familiar with Spark RDDs, the above code doesn't actually load any data from MongoDB -- it's only when you use the RDD in a computation that its partitions are populated lazily.

Partitioning

By default, the RDD created has only one partition, which can create a performance bottleneck in the case of a large collection. If the collection you need to load is indexed in MongoDB, NSMC can ask MongoDB to tell it how to create partitions in a reasonable way. The following configuration will allow you to use that feature. Notice that you have to enable it and set the maximum size (in megabytes) of a Spark partition. You also have to say which indexed fields you want to partition on when you create the RDD.

val conf = new SparkConf()
    .setAppName("My MongoApp").setMaster("local[4]")
    .set("nsmc.connection.host", "myMongoHost")
    .set("nsmc.connection.port", "myMongoPort")
    .set("nsmc.user", "yourUsernameHere")
    .set("nsmc.password", "yourPasswordHere")
    .set("nsmc.split.indexed.collections", "true")
    .set("nsmc.split.chunk.size", "4")
val sc = new SparkContext(conf)
val data = sc.mongoCollection[DBObject]("myDB", "myCollection", Seq("key"))

If you have sharded collections, you can simply turn each shard into a Spark partition. You can even tell NSMC to bypass mongos and connect directly to the shards -- although this setting is best avoided unless you understand MongoDB sharding really well. The following snippet enables both of these features.

val conf = new SparkConf()
    .setAppName("My MongoApp").setMaster("local[4]")
    .set("nsmc.connection.host", "myMongoHost")
    .set("nsmc.connection.port", "myMongoPort")
    .set("nsmc.user", "yourUsernameHere")
    .set("nsmc.password", "yourPasswordHere")
    .set("nsmc.partition.on.shard.chunks", "true")
    .set("nsmc.direct.to.shards", "true")
val sc = new SparkContext(conf)
val data = sc.mongoCollection[DBObject]("myDB", "myCollection")

If you enable partitioning for both unsharded and sharded collections (and this may make sense if you will read from multiple collections), the shards take precedence for sharded collections. That is, a sharded collection will then always be partitioned according to its shards.

Things to note:

  1. All of the properties nsmc.split.indexed.collections, nsmc.partition.on.shard.chunks and nsmc.direct.to.shards default to false, which means that if you don't set them you'll get an unpartitioned RDD.
  2. These settings are global to a SparkContext, which may cause problems in some applications and perhaps provides an interesting design challenge for future versions of this connector.
  3. The setting of nsmc.direct.to.shards is only used if you set nsmc.partition.on.shard.chunks to true

Configuration overview

Configuration for the connector is picked up from your SparkContext. Here is an overview of all the relevant settings.

SettingMeaningUnitsDefault
nsmc.connection.hostMongoDB host or IP addresslocalhost
nsmc.connection.portMongoDB port27017
nsmc.userMongoDB user nameno authentication
nsmc.passwordMongoDB passwordno authentication
nsmc.split.indexed.collectionsShould indexed collections be partitioned using MongoDB's [internal] splitVector command?booleanfalse
nsmc.split.chunk.sizeMaximum chunk size, in megabytes, passed to MongoDB's splitVector command, if used.MB4
nsmc.partition.on.shard.chunksShould collections that are already sharded in MongoDB retain this as their partitioning in Spark? If not, the entire collection will be read as a single Spark partition. booleanfalse
nsmc.direct.to.shardsIf sharding of collections is being observed, should the mongos server be bypassed? (Don't do this unless you understand MongoDB really well, or you may obtain incorrect results -- if MongoDB is rebalancing the shards when your query executes.).booleanfalse

Limitations of the connector

NSMC is strictly experimental, and not suitable for production use. Use it if you have a strong stomach and enjoy experimenting. You can get an overview of its current limitations at any time by checking the Issues page on GitHub but because it's so important to realize just how experimental the connector is currently, I'll list the most important limitations.

  • While, in spirit, NSMC is similar to MongoDB's Hadoop connector, it is much less sophisticated and less tested.
  • There are no Java or Python APIs: it's Scala only
  • There's no integration with Spark SQL
  • Writing data to MongoDB is not supported
  • You can't get MongoDB to filter the collection before loading it into Spark -- the entire collection is loaded, no matter how large.
  • There's no way to take advantage of MongoDB replication
  • Advanced mongoDB authentication is not supported
  • Neither Spark 1.2.0 nor Scala 2.11 are supported
  • Some of the MongoDB commands used in the implementation, also used in the MongoDB connector for Hadoop, are not really public interfaces.

My limitations

I hope people will try the connector out and share their experiences, but I won't actually be able to give anything approaching professional support: this is a self-education project and I'm doing it in my spare time. If the project turns out to be broadly useful, I'm also happy to help turn it into a collaborative one, or to get it incorporated into a larger pre-existing project. I haven't discussed it with the developers of MongoDB, and don't know whether they're at all interested in building a native Spark connector.

It's also important to understand that I don't have a large MongoDB cluster and I'm not going to build one. I've taken some care to get decent logging in place right from the start, and will be happy to look at log files.

What's Next?

As there are a lot of interesting and potentially difficult issues to address before the connector can be considered complete, in future posts (and code commits) I'll address design issues and implementation choices.