Saturday, May 16, 2015

Efficient Spark SQL Queries to MongoDB

In previous posts I've discussed a native Apache Spark connector for MongoDB (NSMC) and NSMC's integration with Spark SQL. The latter post described a naive implementation of Spark SQL integration, intended to demonstrate its value and the applicability of Spark SQL's recently introduced external data source API. In particular, I had then made no attempt to make the implementation efficient. In this post I will describe how to use some of the features of the external data source API to achieve a much more efficient Spark SQL integration.

General Approach

As discussed in earlier posts, the major challenges in implementing a Spark SQL external data source for MongoDB are:

  1. Efficient schema inference for the entire collection.
  2. Efficient use of MongoDB's query capabilities, based on Spark SQL's projection and filter pushdown mechanism, to obtain the data required for each Spark SQL query.

The NSMC project is hosted on GitHub, and the class nsmc.sql.MongoRelationProvider is a good starting point for reading the Spark SQL integration code.

Schema Inference

The Spark SQL schema for a MongoDB collection is computed when the collection is registered using, say, Spark SQL's CREATE TEMPORARY TABLE command. The goal of schema inference is to generate a Spark SQL schema (Seq[StructField]) to which each document in the collection conforms, so that each document can be converted to an org.apache.spark.sql.Row in order to produce an RDD[Row] as a query result. Some of the subtleties of what it means to infer such a schema were discussed in the original post on Spark SQL integration.

Schema inference itself makes use of Spark's parallel computation features. First, a set of partitions is computed for the collection. Then these are parallelized into an RDD. Next, each partition's collection data is read (in parallel) and a separate schema is computed for each partition. Finally, these schemas are all merged into a single schema that definitively represents the collection. This approach is summarized in Figure 1.

Figure 1: Schema inference

An important characteristic of this algorithm is that the collection is not stored -- the documents are analyzed "on the fly" to compute the schema. Obviously this can be a good thing, if the queries end up depending on small subsets of the collection, or a bad thing if the queries tend to re-scan the entire collection every time.

Collection Scan

A collection is scanned when Spark SQL calls the buildScan() method on the MongoTableScan class, which it does in order to process each query -- this interface was discussed in an earlier post on the external data source API. This method is passed the filters and projections that Spark SQL can benefit from being executed by MongoDB. This information is first used to generate an appropriate MongoDB query as discussed below. That query is executed to obtain an RDD[DBObject] (the representation of a document in the MongoDB APIs), which is then converted to an RDD[Row] using the schema information computed previously. This means that the conversion, like schema inference, also occurs in parallel, although the performance benefit of this is dependent on how skewed the query results are.

Figure 2: Table scan

Generating MongoDB queries from the parameters to the buildScan() method is quite straightforward. MongoDB's find() method happens to be conveniently factored for this, as it takes one parameter for filtering and one for projection. I've given a number of examples of how this conversion turns out below.

Example collection and queries

The best way to understand how NSMC's Spark SQL integration works is to look at some example Spark SQL queries together with the generated MongoDB queries.

The collection and its inferred schema

To understand how query evaluation works, we'll look at the following simple set of MongoDB documents:

{ 
  "_id" : ObjectId("554cc53280cecbc6a8579952"), 
   "item" : 1, 
   "quantity" : 5, 
   "price" : 12.5, 
   "category" : "A" 
}
{ 
  "_id" : ObjectId("554cc53280cecbc6a8579953"), 
  "item" : 2, 
  "quantity" : 10, 
  "price" : 12.5, 
  "location" : 
     { 
       "state" : "AZ", 
       "zip" : "85001" 
     }, 
  "category" : "A" 
}
{ 
  "_id" : ObjectId("554cc53280cecbc6a8579954"), 
  "item" : 3, 
  "quantity" : 15, 
  "price" : 12.5, 
  "category" : "B" 
}
{ 
  "_id" : ObjectId("554cc53280cecbc6a8579955"), 
  "item" : 4, 
  "quantity" : 20, 
  "price" : 12.5, 
  "location" : 
     { 
       "state" : "NV", 
       "zip" : "89101" 
     }, 
  "category" : "B" 
}

NSMC infers the following Spark SQL schema for these documents:

root
 |-- _id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item: integer (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: integer (nullable = true)

Filter and Projection Pushdown

As I discussed in an earlier post, Spark SQL's external data source API allows simple cases of filtering and projection to be pushed down to the back end database query, to make use of any indexing capabilities and to minimize the amount of data that has to be transfered back to Spark.

Let's start with the simplest of queries to see how this works, assuming we have registered the above collection with Spark SQL as the table mongoTable.

SELECT * FROM mongoTable

NSMC simply requests all the columns in the inferred schema from MongoDB. Since the always-present _id column is not explicitly suppressed, it will be present in the output. (MongoDB's convention for this field is that it is always returned unless explicitly suppressed.) Including it in this case was a design choice that seems to result in the most intuitive behavior for * queries. The important design choice was actually to include it as a column in the schema. Once that was done, Spark SQL would expect it to be returned in every * query.

Here is the generated MongoDB query, with no filtering and [effectively] no projection.

collection.find( 
  { }, 
  { "category" : 1 , "item" : 1 , "location" : 1 , 
    "price" : 1 , "quantity" : 1 }
)

This query happens to be equivalent to both of the following, but NSMC doesn't recognize the fact:

collection.find( { }, { } )

collection.find( )

A query that explicitly projects columns results in only those being requested from MongoDB.

SELECT item, quantity FROM mongoTable

In this case, Spark SQL would not know how to deal with the _id column even if it was returned, so NSMC explicitly suppresses it to save bandwidth. It's probably not a very important optimization.

collection.find( 
  { }, 
  { "item" : 1 , "quantity" : 1 , "_id" : 0 }
)

Now let's move on to simple filtering.

SELECT item, quantity 
FROM mongoTable 
WHERE quantity > 12

Indeed, the filter is pushed through to the MongoDB query.

collection.find( 
  { "quantity" : { "$gt" : 12}},
  { "item" : 1 , "quantity" : 1 , "_id" : 0}
)

A disjunctive query is not currently handled by NSMC, although the capability to push it through was introduced in Spark 1.3.0.

SELECT item, quantity 
FROM mongoTable 
WHERE quantity > 12 OR quantity < 8

So only projection is pushed through.

collection.find( 
  {},
  { "item" : 1 , "quantity" : 1 , "_id" : 0}
)

Accessing a nested document or array element doesn't disable projection pushdown.

SELECT item, quantity, location.zip 
FROM mongoTable

The projection is "widened" to the top level column, so somewhat too much data is pulled in from MongoDB.

collection.find( 
  {},
  { "item" : 1 , "quantity" : 1 , "location" : 1 , "_id" : 0}
)

Filtering on a nested document or array element is more problematic.

SELECT item, quantity, location.zip 
FROM mongoTable 
WHERE location.state = 'AZ' AND quantity = 10

There's no widening trick to play here, so the problematic conjunct is not pushed through.

collection.find( 
  { "quantity" : 10 }
  { "item" : 1 , "quantity" : 1 , "location" : 1 , "_id" : 0}
)

We should also check an aggregation query.

SELECT category, sum(quantity) FROM mongoTable GROUP BY category

The external data source API doesn't have a way to push grouping and aggregation through, so MongoDB is just asked to fetch the underlying data.

collection.find( 
  { },
  { "category" : 1 , "quantity" : 1 , "_id" : 0 }
)

Scope for Improvement

A number of changes either in NSMC's implementation or in Spark SQL's implementation could result in still more efficient processing of queries to MongoDB.

Sampling

Scanning the entire collection to compute a schema has the advantage of not missing documents that would have added valuable schema information. On the other hand, it can be very expensive. Sampling clearly has to be optional, with a parameter allowing the user to specify how much of the collection to sample.

Narrower Schema Inference

Currently, a schema is always inferred for the entire collection, which can be expensive if queries will only use part of the collection. An obvious refinement would be to allow users to specify a projection when registering the collection. Being able to specify a filter would be even more powerful, but less obviously useful.

Caching

The inferred schema is cached for use by all queries. This has its dangers, which are mitigated by Spark SQL's ability to refresh a registered collection.

Since the results of a query conform to all the standard characteristics of an RDD, they benefit from Spark's caching capabilities. However, it is not yet clear to me how much they benefit across multiple queries. This needs some investigation and experimentation.

Tighter Integration with Catalyst

Spark SQL's external data source API was designed with simplicity and ease of use in mind, and in my opinion it succeeds admirably in this regard. However, the API does provide for deeper (and harder to use) integration with Spark SQL's Catalyst query compiler. It may in the future be possible to improve NSMC's Spark SQL integration by using this feature.

Limitations of the External Data Source API

Spark SQL's external data source API basically can't handle nested structures or arrays. This means certain queries against a MongoDB collection will always require that too much data be returned from MongoDB to Spark. I don't know if this is at all on the radar of the Spark SQL developers.

Additionally, the external data source API does not support pushing joins or grouping and aggregation to the external database.

Integrating with Spark SQL's cost model

The external data source API allows an implementation to provide information about the size of a collection, but NSMC doesn't use this feature. It may allow the Catalyst query optimizer to make better query processing decisions.

Limitations of Catalyst

The Catalyst query optimizer currently has a limited cost model, and improvements in it may be helpful to NSMC's performance.

Improved Partitioning

NSMC's collection partitioning features are currently quite naive, and this may limit the efficiency gains of parallel schema inference and parallel query execution. This needs some experimentation.

Summary

In my original post about NSMC's Spark SQL integration, I described a very naive implementation, intended to show that such integration was possible and could be useful. In this post I've described more recent work to take advantage of Spark's parallelism for schema inference, to prevent schema inference from consuming large amounts of memory, and to take advantage of Spark SQL's filter and projection pushdown features to dramatically improve query performance in many cases. While there's a lot more to be done, these improvements in many cases give NSMC's Spark SQL integration quite realistic performance.

Thursday, May 14, 2015

JDBC Access to MongoDB via Apache Spark

In previous posts I've discussed a native Apache Spark connector for MongoDB (NSMC) and NSMC's integration with Spark SQL. The latter post described an example project that issued Spark SQL queries via Scala code. However, much of the value of Spark SQL integration comes from the possibility of it being used either by pre-existing tools or applications, or by end users who understand SQL but do not, in general, write code. In this post I'll describe how to set up a Spark Thrift server with NSMC, and query it via JDBC calls.

JDBC access to Spark SQL

To make JDBC connections to Spark SQL, you need to run the Spark Thrift server, for which I'll give instructions below. Writing a Java application that connects to the Thrift server requires the HiveServer2 JDBC connector. The link provides some general information together with a list of JARs needed in the CLASSPATH, but I prefer to use Maven dependencies. The example described here works with the following quite conservative dependencies:

<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-jdbc</artifactId>
  <version>0.13.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.2.0</version>
</dependency>

If you prefer to live on the edge, the following dependencies work too:

<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-jdbc</artifactId>
  <version>1.1.0</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.7.0</version>
</dependency>

Running a Spark Thrift server with NSMC

Let's begin by setting up a Spark Thrift server that has access to NSMC.

Prerequisites

In order to follow along with the setup and examples you will need to set up the following:

  1. A running instance of MongoDB.
  2. A running instance of Spark 1.3.1 or later that can communicate with the MongoDB instance.
  3. The NSMC Examples project, having run the main() method of the PopulateTestCollection class to set up the example collection in your MongoDB instance. (While the rest of the example classes need to be submitted as a Spark job, PopulateTestCollection connects directly to your MongoDB instance, and can be run stand-alone.)
  4. The NSMC JDBC Client Samples project, which I'll describe in some detail below.
  5. A configuration file that will be used to tell NSMC how to connect to your MongoDB instance -- we'll call it nsmc.conf.
The required contents of your nsmc.conf file are as follows:
spark.nsmc.connection.host      <your MongoDB host>
spark.nsmc.connection.port      <your MongoDB port>
# omit the next two lines if you're not using MongoDB authentication, 
# otherwise provide an appropriate username and password
spark.nsmc.user                 <your MongoDB user name>
spark.nsmc.password             <your MongoDB password>

Startup

Next you need to start a Spark Thrift server to act as a JDBC listener. You need to tell it:
  1. Where to find your Spark master.
  2. What packages it needs to download from the Maven Central Repository -- in this case a supported version of NSMC, v0.5.2 or later, compiled for Scala 2.10. (Earlier versions of NSMC cannot be used in this way.)
  3. Where to find NSMC configuration -- in this case your nsmc.conf file.
<your Spark installation>/sbin/start-thriftserver.sh \
    --master <your spark master URL> \
    --packages com.github.spirom:spark-mongodb-connector_2.10:0.5.2 \
    --properties-file <path to config files>/nsmc.conf

If you do not want Spark to download NSMC directly from the Maven Central Repository, perhaps because the servers do not have access to the Internet, you can download an assembly containing all the relevant packages -- look for spark-mongodb-connector-assembly-0.5.2.jar or a later version. If you take this approach you need to:

  • Save the assembly somewhere that is accessible on all servers in your Spark installation
  • Skip the --packages setting when starting the Thrift server and instead use --driver-class-path together with the path to where you saved the assembly.
  • Add an entry to your nsmc.conf file where the key is spark.executor.extraClassPath and the value is again the path to where you saved the assembly file.

Running the JDBC Examples

If you have cloned the NSMC JDBC Client Samples project, you can build and run it either through IntelliJ Idea, or through Apache Maven by using the commands:

mvn compile
mvn exec:java -Dexec.mainClass="Demo"

Most of the code is fairly mundane JDBC, and I won't attempt to write a JDBC tutorial. But I will step through some of the most interesting points. You may need to modify the connection string if you're running the Spark Thrift server on a remote host or a non-default port. Furthermore, if you've set up authentication you'll need to provide an appropriate user name and password.

Connection con = 
    DriverManager.getConnection("jdbc:hive2://localhost:10000/default", 
                                "hive", "");

The most important part of the example is registering a MongoDB collection with Spark SQL. There are a number of interesting aspects.

  • You need to register a temporary table, and thus you will need to re-register it if you restart the Spark Thrift server. Spark (as of 1.3.0) now supports persisting metadata for external resources like this in the Hive Metastore, but NSMC (as of v0.5.2) isn't yet capable of taking advantage of it.
  • Because all connections to a Spark Thrift server run in the same HiveContext, registering a MongoDB collection once like this makes it available to all users until the server is restarted.
  • It is the USING clause that tells Spark SQL to use NSMC for querying this resource. Since you told the Spark Thrift server to download an appropriate version of NSMC from the Maven Central Repository, NSMC will both set up the right metadata when a collection is registered, and then also process any queries to this table.
  • You can register several different MongoDB collections, potentially in different databases, through multiple invocations of CREATE TEMPORARY TABLE. Alas, all connections must use the same MongoDB endpoint (host/port) with the same NSMC configuration. See NSMC Issue #15 for the status of the needed enhancement.
CREATE TEMPORARY TABLE dataTable
USING nsmc.sql.MongoRelationProvider
OPTIONS (db 'test', collection 'scratch')

The remaining code executes some simple DDL and DML queries, and should be clear if you've looked at the MongoDB collection and queries in the NSMC Examples project. You will recognize the following Spark SQL schema as being the schema that NSMC infers from the MongoDB collection.

root
 |-- _id: string (nullable = true)
 |-- billingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- custid: string (nullable = true)
 |-- discountCode: integer (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemid: string (nullable = true)
 |    |    |-- orderid: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |-- shippingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)

Since this schema has both arrays and nested structures, I want to spend the rest of this post focusing on how JDBC handles returning query results from such an aggressively non-rectangular (or non-relational) schema. As you have seen in my original Spark SQL post, Spark SQL itself deals with this extraordinarily well, but alas the Hive JDBC connector does not.

The following simple query illustrates the issues, since billingAddress contains a nested document and orders contains an array.

SELECT custid, billingAddress, orders FROM dataTable

If you execute the query from Scala and print the schema of the result you will see:

root
 |-- custid: string (nullable = true)
 |-- billingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemid: string (nullable = true)
 |    |    |-- orderid: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)

And printing the data frame yields:

custid billingAddress orders               
1001   [NV,89150]     List([A001,100000... 
1002   [CA,92093]     List([B012,100000... 
1003   [AZ,85014]     List([A001,100000... 
1004   null           null                 

In JDBC we would expect the billingAddress to be modeled as java.sql.Struct and the orders to be a java.sql.Array (where the elements are also java.sql.Struct). Instead, we see that JDBC recognizes the original types, but transmits all the structured values as JSON strings. The following code can be used to show how JDBC perceives the column types:

ResultSetMetaData rsm = res.getMetaData();
System.out.println("*** Column metadata:");
int ncols = rsm.getColumnCount();
System.out.println("total of " + ncols + " columns");
for (int i = 1; i <= ncols; i++) {
    System.out.println("Column " + i + " : " + rsm.getColumnName(i));
    System.out.println(" Label : " + rsm.getColumnLabel(i));
    System.out.println(" Class Name : " + rsm.getColumnClassName(i));
    System.out.println(" Type : " + rsm.getColumnType(i));
    System.out.println(" Type Name : " + rsm.getColumnTypeName(i));
}

Here is the output:

*** Column metadata:
total of 3 columns
Column 1 : custid
  Label : custid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 2 : billingAddress
  Label : billingAddress
  Class Name : java.lang.String
  Type : 2002
  Type Name : struct
Column 3 : orders
  Label : orders
  Class Name : java.lang.String
  Type : 2003
  Type Name : array

The following code allows us to see the values. Since all of the columns have been converted to strings, it's worth noting that the highlighted code could just as well have been replaced with res.getString(i).

System.out.println("*** Data:");
while (res.next()) {
    System.out.println("Row " + res.getRow());
    for (int i = 1; i <= ncols; i++) {
        System.out.println(" Column " + i + " : " + res.getObject(i));
    }
}

Here is the output. Notice that it is quite readable, and the JSON strings could be parsed by your client code if needed, especially since you can use the ResultSetMetaData to determine what to expect in the String. Whether this is "good enough" really depends on your needs.

*** Data:
Row 1
  Column 1 : 1001
  Column 2 : {"state":"NV","zip":"89150"}
  Column 3 : [{"itemid":"A001","orderid":"1000001","quantity":175},
              {"itemid":"A002","orderid":"1000002","quantity":20}]
Row 2
  Column 1 : 1002
  Column 2 : {"state":"CA","zip":"92093"}
  Column 3 : [{"itemid":"B012","orderid":"1000002","quantity":200}]
Row 3
  Column 1 : 1003
  Column 2 : {"state":"AZ","zip":"85014"}
  Column 3 : [{"itemid":"A001","orderid":"1000003","quantity":175},
              {"itemid":"B001","orderid":"1000004","quantity":10},
              {"itemid":"A060","orderid":"1000005","quantity":12}]
Row 4
  Column 1 : 1004
  Column 2 : null
  Column 3 : null

Advantages of using HiveQL in queries

If Hive's compromise for returning semi-structured data doesn't meet your needs, you may want to consider doing more of the work in the query. For example, Spark SQL supports drilling into nested structures as follows:

SELECT custid, billingAddress.zip FROM dataTable

This actually works remarkably well. For example, if billingAddress happens to be null in some document, asking for billingAddress.zip also returns NULL, instead of generating an error. However, Spark SQL doesn't give you a good way to unpack arrays -- and yet there is a coping strategy for those too.

Requests to the Thrift JDBC server are executed in a HiveContext, so in addition to having access to Spark SQL as a query language, you can write queries in HiveQL. More than just being convenient for experienced Hive users, this allows you to work around some of the shortcomings of the Hive JDBC driver for dealing with the "non-rectangular" aspects of MongoDB collections. For example, the "LATERAL VIEW" feature of HiveQL allows you to effectively "denormalize" the data your query returns:

SELECT custid, o.orderid, o.itemid, o.quantity 
FROM dataTable LATERAL VIEW explode(orders) t AS o

Instead of returning one result row for each matching document, it returns one row for each order, with the custid joined on. The result is completely rectangular, and so is easy to deal with in your client code. First, you can see this in the "Type Name" of the metadata.

*** Column metadata:
total of 4 columns
Column 1 : custid
  Label : custid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 2 : orderid
  Label : orderid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 3 : itemid
  Label : itemid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 4 : quantity
  Label : quantity
  Class Name : java.lang.Integer
  Type : 4
  Type Name : int

Second, you can see it when you print the results.

*** Data:
Row 1
  Column 1 : 1001
  Column 2 : 1000001
  Column 3 : A001
  Column 4 : 175
Row 2
  Column 1 : 1001
  Column 2 : 1000002
  Column 3 : A002
  Column 4 : 20
Row 3
  Column 1 : 1002
  Column 2 : 1000002
  Column 3 : B012
  Column 4 : 200
Row 4
  Column 1 : 1003
  Column 2 : 1000003
  Column 3 : A001
  Column 4 : 175
Row 5
  Column 1 : 1003
  Column 2 : 1000004
  Column 3 : B001
  Column 4 : 10
Row 6
  Column 1 : 1003
  Column 2 : 1000005
  Column 3 : A060
  Column 4 : 12

You can get more information about this technique in the relevant section of the Hive Language Manual.

Summary

Using Apache Spark's Thrift server and NSMC's integration with Spark SQL, you can run highly distributed SQL queries against MongoDB collections, perhaps even performing joins between MongoDB collections and data stored elsewhere.