In previous posts I've discussed a native Apache Spark connector for MongoDB (NSMC) and NSMC's integration with Spark SQL. The latter post described a naive implementation of Spark SQL integration, intended to demonstrate its value and the applicability of Spark SQL's recently introduced external data source API. In particular, I had then made no attempt to make the implementation efficient. In this post I will describe how to use some of the features of the external data source API to achieve a much more efficient Spark SQL integration.
General Approach
As discussed in earlier posts, the major challenges in implementing a Spark SQL external data source for MongoDB are:
- Efficient schema inference for the entire collection.
- Efficient use of MongoDB's query capabilities, based on Spark SQL's projection and filter pushdown mechanism, to obtain the data required for each Spark SQL query.
The NSMC project is hosted on GitHub, and the class nsmc.sql.MongoRelationProvider is a good starting point for reading the Spark SQL integration code.
Schema Inference
The Spark SQL schema for a MongoDB collection is computed when the collection is registered using, say, Spark SQL's CREATE TEMPORARY TABLE command. The goal of schema inference is to generate a Spark SQL schema (Seq[StructField]) to which each document in the collection conforms, so that each document can be converted to an org.apache.spark.sql.Row in order to produce an RDD[Row] as a query result. Some of the subtleties of what it means to infer such a schema were discussed in the original post on Spark SQL integration.
Schema inference itself makes use of Spark's parallel computation features. First, a set of partitions is computed for the collection. Then these are parallelized into an RDD. Next, each partition's collection data is read (in parallel) and a separate schema is computed for each partition. Finally, these schemas are all merged into a single schema that definitively represents the collection. This approach is summarized in Figure 1.
An important characteristic of this algorithm is that the collection is not stored -- the documents are analyzed "on the fly" to compute the schema. Obviously this can be a good thing, if the queries end up depending on small subsets of the collection, or a bad thing if the queries tend to re-scan the entire collection every time.
Collection Scan
A collection is scanned when Spark SQL calls the buildScan() method on the MongoTableScan class, which it does in order to process each query -- this interface was discussed in an earlier post on the external data source API. This method is passed the filters and projections that Spark SQL can benefit from being executed by MongoDB. This information is first used to generate an appropriate MongoDB query as discussed below. That query is executed to obtain an RDD[DBObject] (the representation of a document in the MongoDB APIs), which is then converted to an RDD[Row] using the schema information computed previously. This means that the conversion, like schema inference, also occurs in parallel, although the performance benefit of this is dependent on how skewed the query results are.
Generating MongoDB queries from the parameters to the buildScan() method is quite straightforward. MongoDB's find() method happens to be conveniently factored for this, as it takes one parameter for filtering and one for projection. I've given a number of examples of how this conversion turns out below.
Example collection and queries
The best way to understand how NSMC's Spark SQL integration works is to look at some example Spark SQL queries together with the generated MongoDB queries.
The collection and its inferred schema
To understand how query evaluation works, we'll look at the following simple set of MongoDB documents:
{ "_id" : ObjectId("554cc53280cecbc6a8579952"), "item" : 1, "quantity" : 5, "price" : 12.5, "category" : "A" } { "_id" : ObjectId("554cc53280cecbc6a8579953"), "item" : 2, "quantity" : 10, "price" : 12.5, "location" : { "state" : "AZ", "zip" : "85001" }, "category" : "A" } { "_id" : ObjectId("554cc53280cecbc6a8579954"), "item" : 3, "quantity" : 15, "price" : 12.5, "category" : "B" } { "_id" : ObjectId("554cc53280cecbc6a8579955"), "item" : 4, "quantity" : 20, "price" : 12.5, "location" : { "state" : "NV", "zip" : "89101" }, "category" : "B" }
NSMC infers the following Spark SQL schema for these documents:
root |-- _id: string (nullable = true) |-- category: string (nullable = true) |-- item: integer (nullable = true) |-- location: struct (nullable = true) | |-- state: string (nullable = true) | |-- zip: string (nullable = true) |-- price: double (nullable = true) |-- quantity: integer (nullable = true)
Filter and Projection Pushdown
As I discussed in an earlier post, Spark SQL's external data source API allows simple cases of filtering and projection to be pushed down to the back end database query, to make use of any indexing capabilities and to minimize the amount of data that has to be transfered back to Spark.
Let's start with the simplest of queries to see how this works, assuming we have registered the above collection with Spark SQL as the table mongoTable.
SELECT * FROM mongoTable
NSMC simply requests all the columns in the inferred schema from MongoDB. Since the always-present _id column is not explicitly suppressed, it will be present in the output. (MongoDB's convention for this field is that it is always returned unless explicitly suppressed.) Including it in this case was a design choice that seems to result in the most intuitive behavior for * queries. The important design choice was actually to include it as a column in the schema. Once that was done, Spark SQL would expect it to be returned in every * query.
Here is the generated MongoDB query, with no filtering and [effectively] no projection.
collection.find( { }, { "category" : 1 , "item" : 1 , "location" : 1 , "price" : 1 , "quantity" : 1 } )
This query happens to be equivalent to both of the following, but NSMC doesn't recognize the fact:
collection.find( { }, { } ) collection.find( )
A query that explicitly projects columns results in only those being requested from MongoDB.
SELECT item, quantity FROM mongoTable
In this case, Spark SQL would not know how to deal with the _id column even if it was returned, so NSMC explicitly suppresses it to save bandwidth. It's probably not a very important optimization.
collection.find( { }, { "item" : 1 , "quantity" : 1 , "_id" : 0 } )
Now let's move on to simple filtering.
SELECT item, quantity FROM mongoTable WHERE quantity > 12
Indeed, the filter is pushed through to the MongoDB query.
collection.find( { "quantity" : { "$gt" : 12}}, { "item" : 1 , "quantity" : 1 , "_id" : 0} )
A disjunctive query is not currently handled by NSMC, although the capability to push it through was introduced in Spark 1.3.0.
SELECT item, quantity FROM mongoTable WHERE quantity > 12 OR quantity < 8
So only projection is pushed through.
collection.find( {}, { "item" : 1 , "quantity" : 1 , "_id" : 0} )
Accessing a nested document or array element doesn't disable projection pushdown.
SELECT item, quantity, location.zip
FROM mongoTable
The projection is "widened" to the top level column, so somewhat too much data is pulled in from MongoDB.
collection.find(
{},
{ "item" : 1 , "quantity" : 1 , "location" : 1 , "_id" : 0}
)
Filtering on a nested document or array element is more problematic.
SELECT item, quantity, location.zip FROM mongoTable WHERE location.state = 'AZ' AND quantity = 10
There's no widening trick to play here, so the problematic conjunct is not pushed through.
collection.find( { "quantity" : 10 } { "item" : 1 , "quantity" : 1 , "location" : 1 , "_id" : 0} )
We should also check an aggregation query.
SELECT category, sum(quantity) FROM mongoTable GROUP BY category
The external data source API doesn't have a way to push grouping and aggregation through, so MongoDB is just asked to fetch the underlying data.
collection.find( { }, { "category" : 1 , "quantity" : 1 , "_id" : 0 } )
Scope for Improvement
A number of changes either in NSMC's implementation or in Spark SQL's implementation could result in still more efficient processing of queries to MongoDB.
Sampling
Scanning the entire collection to compute a schema has the advantage of not missing documents that would have added valuable schema information. On the other hand, it can be very expensive. Sampling clearly has to be optional, with a parameter allowing the user to specify how much of the collection to sample.
Narrower Schema Inference
Currently, a schema is always inferred for the entire collection, which can be expensive if queries will only use part of the collection. An obvious refinement would be to allow users to specify a projection when registering the collection. Being able to specify a filter would be even more powerful, but less obviously useful.
Caching
The inferred schema is cached for use by all queries. This has its dangers, which are mitigated by Spark SQL's ability to refresh a registered collection.
Since the results of a query conform to all the standard characteristics of an RDD, they benefit from Spark's caching capabilities. However, it is not yet clear to me how much they benefit across multiple queries. This needs some investigation and experimentation.
Tighter Integration with Catalyst
Spark SQL's external data source API was designed with simplicity and ease of use in mind, and in my opinion it succeeds admirably in this regard. However, the API does provide for deeper (and harder to use) integration with Spark SQL's Catalyst query compiler. It may in the future be possible to improve NSMC's Spark SQL integration by using this feature.
Limitations of the External Data Source API
Spark SQL's external data source API basically can't handle nested structures or arrays. This means certain queries against a MongoDB collection will always require that too much data be returned from MongoDB to Spark. I don't know if this is at all on the radar of the Spark SQL developers.
Additionally, the external data source API does not support pushing joins or grouping and aggregation to the external database.
Integrating with Spark SQL's cost model
The external data source API allows an implementation to provide information about the size of a collection, but NSMC doesn't use this feature. It may allow the Catalyst query optimizer to make better query processing decisions.
Limitations of Catalyst
The Catalyst query optimizer currently has a limited cost model, and improvements in it may be helpful to NSMC's performance.
Improved Partitioning
NSMC's collection partitioning features are currently quite naive, and this may limit the efficiency gains of parallel schema inference and parallel query execution. This needs some experimentation.
Summary
In my original post about NSMC's Spark SQL integration, I described a very naive implementation, intended to show that such integration was possible and could be useful. In this post I've described more recent work to take advantage of Spark's parallelism for schema inference, to prevent schema inference from consuming large amounts of memory, and to take advantage of Spark SQL's filter and projection pushdown features to dramatically improve query performance in many cases. While there's a lot more to be done, these improvements in many cases give NSMC's Spark SQL integration quite realistic performance.