Sunday, August 28, 2016

Taking a Detour with Apache Spark

Almost two years ago, while preparing for a talk I was giving at the now defunct Seattle Eastside Scala Meetup, I started a public GitHub project collecting and organizing Apache Spark code examples in Scala. I had stumbled on a way to run the examples on all supported platforms without setting up or deploying to a cluster, so the overheads of experimenting with the Spark APIs and programming idioms were remarkably low. It seemed like this approach was not well known at the time, so I shared it via the GItHub project and by posting here. Other than avoiding the overheads of a Spark cluster, the main feature of the project has been a "baby steps" approach to the examples. I've tried to demonstrate each API feature with multiple, slightly varying examples and (with notable, unfortunate exceptions) comments, to build intuitions before leaving the readers to take their chances with the Scaladoc.

Two years and about sixty sample programs later, I'm still not sure of the project's role and future, except that it has been tremendously helpful to my learning about Spark and Scala. The Apache Spark project's documentation and examples have improved, as has test coverage -- the latter always being a good way to learn about a new feature, except when there isn't any. The Databricks blog has also made a difference. And yet, the project continues to be useful to me, and I occasionally hear from others who find it helpful, including one local company that uses it in their training program. I like the "baby steps" approach to learning an API, and apparently I'm not the only one.

But lately I've had to ask myself some hard questions about the project. As I hope to post separately about soon, the evolution of Spark SQL's object model (remember SchemaRDD?) has made the task of keeping the project organized rather challenging lately -- I don't like to move examples around so I don't break links from the blog, StackOverflow and elsewhere. Another problem that's been nagging at me lately is my choice of Scala for the examples. I enjoy using Scala, have enjoyed learning it, and the Apache Spark project continues to keep the Scala APIs as a first class citizen. Indeed, Spark is written in Scala, but as I'll discuss later, that's no guarantee of strong support for Scala APIs. I've never been interested in the Python or R APIs, even though I believe they're of tremendous importance to the industry: I'm not part of the target audience (broadly, the data scientist) and I don't enjoy programming in either language. That leaves Java.

Time to explore the Java APIs

Many of you have seen the various Typesafe/Lightbend surveys showing Scala to be more popular than Java for Spark development -- the latest one has it at 76% Scala, 58% Java 8 and 34% Java 7 or lower. Clearly, there is overlap, so it's not clear whether Java or Scala are more popular overall. I see several reasons to explore Spark from the Java perspective:

  • Java is clearly an important part of the Spark ecosystem, as the surveys show.
  • The Java APIs are not merely an afterthought in Spark: real effort seems to have been invested in making Java programming practical and a reasonable approach.
  • While even a quick examination of the Spark project's Java examples (which date back to Java 7) shows them to be verbose and awkward compared with the Scala examples, the introduction of functional programming features in Java 8 raises the possibility of Java catching up.
  • I see a certain hesitation about Scala in the "big data" ecosystem. Lightbend has taken the "we don't have to choose" approach, and seems to be pretty sincere about it -- and of course they should be if they believe their own survey results. Confluent's decision about Apache Kafka is a bit more interesting: Kafka is also written in Scala, but only supports a Java API, with others provided by the community. While Cake Solutions actively develops the scala-kafka-client project, the Scala APIs are definitely not a first class citizen.
  • I've been a Java programmer, on and off, for 18 years. Before Scala, it was my recreational language of choice, and I still like it. I'm curious about Java 8, which I've only used a little, for another recent project.

Together, these certainly don't motivate me to abandon Scala, but they do motivate me to understand the tradeoffs better than I do now. The bottom line is that I've started adding some Java examples to the project, and started marking my commit messages with "[Scala]" or "[Java]" as appropriate.

Important Questions

I'm definitely making this up as I go, so let me expose some of the decisions I'm trying to make.

Which Examples?

I started with Dataset and DataFrame, since I had recently worked on those in Scala. But I'd at least like to get a cross section of the different areas: core RDDs, SQL, streaming and perhaps GraphX. Then I'll probably focus more on the areas that bring out interesting differences, whichever they turn out to be. There's no point exploring Spark SQL as a query language comprehensively in both Java and Scala, so I won't do it in Java.

Which Version(s) of Spark?

This is easy: much of the reason I invest in the project is to keep up with Spark evolution, and it takes a lot of effort. I'll continue adopting each new Spark release as soon as I can, and use its new features.

Java 8 or Earlier?

Java 8 seems to be getting a lot of adoption, and the new features definitely make it better suited to Spark. But the APIs have a number of features that were intended to work around the deficiencies of earlier versions of Java (such as all of org.apache.spark.api.java.function), so it seems interesting to explore them for a while. Yet I'll probably change to Java 8 soon to keep the project from becoming a museum.

One or Two Projects on GitHub?

So far I've used the parallel Scala/Java source tree structure of sbt projects to locate the Java code in the same project as the Scala code, but I'm already feeling like this was a bad idea. I think it hinders my ability to serve the community, since Java programmers are much more likely to be familiar with Maven than sbt, and the one Java class I had written to support the Scala code (hiveql.SumLargeSalesUDAF) is now tangled up with the Java examples. I think you can expect to see a separate project soon. (Splitting the projects also allows me to use different Java versions.)

Parallel Organization?

As I mentioned earlier, the evolution of the object model around Spark SQL has made it hard to keep the project organized, and the Scala examples are getting out of hand. I'm not going to inflict this entropy on Java developers, and will try to organize the Java examples according to my current understanding of how Spark fits together. In due course this may help me sort out the organization of the Scala examples too -- in any case I'm hoping to write a separate post on this topic.

How Much Effort?

I don't know how much I'll balance my effort on Scala and Java examples, or even whether I'll keep working on the Java ones for much longer. It depends on feedback, how much insight I get, where the community ends up going, and how Java and Scala (and Spark) evolve.

Abandoning Scala?

I've already made this decision: definitely not. It's now my recreational language of choice, and I think it has a future. At the very least, I plan to keep up my attempts at covering major Spark features in Scala as they evolve.

Feedback please!

While my "baby steps" approach to Spark examples seems to have had some impact on the community, I get very little direct feedback. Occasional questions have inspired some of the examples, which I hope were helpful to those people, and one local professional who reached out and told me how he has found the project valuable has dramatically increased my motivation. I'd be delighted to hear about your experiences, either about the examples themselves, or about Spark in general.

Saturday, May 16, 2015

Efficient Spark SQL Queries to MongoDB

In previous posts I've discussed a native Apache Spark connector for MongoDB (NSMC) and NSMC's integration with Spark SQL. The latter post described a naive implementation of Spark SQL integration, intended to demonstrate its value and the applicability of Spark SQL's recently introduced external data source API. In particular, I had then made no attempt to make the implementation efficient. In this post I will describe how to use some of the features of the external data source API to achieve a much more efficient Spark SQL integration.

General Approach

As discussed in earlier posts, the major challenges in implementing a Spark SQL external data source for MongoDB are:

  1. Efficient schema inference for the entire collection.
  2. Efficient use of MongoDB's query capabilities, based on Spark SQL's projection and filter pushdown mechanism, to obtain the data required for each Spark SQL query.

The NSMC project is hosted on GitHub, and the class nsmc.sql.MongoRelationProvider is a good starting point for reading the Spark SQL integration code.

Schema Inference

The Spark SQL schema for a MongoDB collection is computed when the collection is registered using, say, Spark SQL's CREATE TEMPORARY TABLE command. The goal of schema inference is to generate a Spark SQL schema (Seq[StructField]) to which each document in the collection conforms, so that each document can be converted to an org.apache.spark.sql.Row in order to produce an RDD[Row] as a query result. Some of the subtleties of what it means to infer such a schema were discussed in the original post on Spark SQL integration.

Schema inference itself makes use of Spark's parallel computation features. First, a set of partitions is computed for the collection. Then these are parallelized into an RDD. Next, each partition's collection data is read (in parallel) and a separate schema is computed for each partition. Finally, these schemas are all merged into a single schema that definitively represents the collection. This approach is summarized in Figure 1.

Figure 1: Schema inference

An important characteristic of this algorithm is that the collection is not stored -- the documents are analyzed "on the fly" to compute the schema. Obviously this can be a good thing, if the queries end up depending on small subsets of the collection, or a bad thing if the queries tend to re-scan the entire collection every time.

Collection Scan

A collection is scanned when Spark SQL calls the buildScan() method on the MongoTableScan class, which it does in order to process each query -- this interface was discussed in an earlier post on the external data source API. This method is passed the filters and projections that Spark SQL can benefit from being executed by MongoDB. This information is first used to generate an appropriate MongoDB query as discussed below. That query is executed to obtain an RDD[DBObject] (the representation of a document in the MongoDB APIs), which is then converted to an RDD[Row] using the schema information computed previously. This means that the conversion, like schema inference, also occurs in parallel, although the performance benefit of this is dependent on how skewed the query results are.

Figure 2: Table scan

Generating MongoDB queries from the parameters to the buildScan() method is quite straightforward. MongoDB's find() method happens to be conveniently factored for this, as it takes one parameter for filtering and one for projection. I've given a number of examples of how this conversion turns out below.

Example collection and queries

The best way to understand how NSMC's Spark SQL integration works is to look at some example Spark SQL queries together with the generated MongoDB queries.

The collection and its inferred schema

To understand how query evaluation works, we'll look at the following simple set of MongoDB documents:

{ 
  "_id" : ObjectId("554cc53280cecbc6a8579952"), 
   "item" : 1, 
   "quantity" : 5, 
   "price" : 12.5, 
   "category" : "A" 
}
{ 
  "_id" : ObjectId("554cc53280cecbc6a8579953"), 
  "item" : 2, 
  "quantity" : 10, 
  "price" : 12.5, 
  "location" : 
     { 
       "state" : "AZ", 
       "zip" : "85001" 
     }, 
  "category" : "A" 
}
{ 
  "_id" : ObjectId("554cc53280cecbc6a8579954"), 
  "item" : 3, 
  "quantity" : 15, 
  "price" : 12.5, 
  "category" : "B" 
}
{ 
  "_id" : ObjectId("554cc53280cecbc6a8579955"), 
  "item" : 4, 
  "quantity" : 20, 
  "price" : 12.5, 
  "location" : 
     { 
       "state" : "NV", 
       "zip" : "89101" 
     }, 
  "category" : "B" 
}

NSMC infers the following Spark SQL schema for these documents:

root
 |-- _id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item: integer (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: integer (nullable = true)

Filter and Projection Pushdown

As I discussed in an earlier post, Spark SQL's external data source API allows simple cases of filtering and projection to be pushed down to the back end database query, to make use of any indexing capabilities and to minimize the amount of data that has to be transfered back to Spark.

Let's start with the simplest of queries to see how this works, assuming we have registered the above collection with Spark SQL as the table mongoTable.

SELECT * FROM mongoTable

NSMC simply requests all the columns in the inferred schema from MongoDB. Since the always-present _id column is not explicitly suppressed, it will be present in the output. (MongoDB's convention for this field is that it is always returned unless explicitly suppressed.) Including it in this case was a design choice that seems to result in the most intuitive behavior for * queries. The important design choice was actually to include it as a column in the schema. Once that was done, Spark SQL would expect it to be returned in every * query.

Here is the generated MongoDB query, with no filtering and [effectively] no projection.

collection.find( 
  { }, 
  { "category" : 1 , "item" : 1 , "location" : 1 , 
    "price" : 1 , "quantity" : 1 }
)

This query happens to be equivalent to both of the following, but NSMC doesn't recognize the fact:

collection.find( { }, { } )

collection.find( )

A query that explicitly projects columns results in only those being requested from MongoDB.

SELECT item, quantity FROM mongoTable

In this case, Spark SQL would not know how to deal with the _id column even if it was returned, so NSMC explicitly suppresses it to save bandwidth. It's probably not a very important optimization.

collection.find( 
  { }, 
  { "item" : 1 , "quantity" : 1 , "_id" : 0 }
)

Now let's move on to simple filtering.

SELECT item, quantity 
FROM mongoTable 
WHERE quantity > 12

Indeed, the filter is pushed through to the MongoDB query.

collection.find( 
  { "quantity" : { "$gt" : 12}},
  { "item" : 1 , "quantity" : 1 , "_id" : 0}
)

A disjunctive query is not currently handled by NSMC, although the capability to push it through was introduced in Spark 1.3.0.

SELECT item, quantity 
FROM mongoTable 
WHERE quantity > 12 OR quantity < 8

So only projection is pushed through.

collection.find( 
  {},
  { "item" : 1 , "quantity" : 1 , "_id" : 0}
)

Accessing a nested document or array element doesn't disable projection pushdown.

SELECT item, quantity, location.zip 
FROM mongoTable

The projection is "widened" to the top level column, so somewhat too much data is pulled in from MongoDB.

collection.find( 
  {},
  { "item" : 1 , "quantity" : 1 , "location" : 1 , "_id" : 0}
)

Filtering on a nested document or array element is more problematic.

SELECT item, quantity, location.zip 
FROM mongoTable 
WHERE location.state = 'AZ' AND quantity = 10

There's no widening trick to play here, so the problematic conjunct is not pushed through.

collection.find( 
  { "quantity" : 10 }
  { "item" : 1 , "quantity" : 1 , "location" : 1 , "_id" : 0}
)

We should also check an aggregation query.

SELECT category, sum(quantity) FROM mongoTable GROUP BY category

The external data source API doesn't have a way to push grouping and aggregation through, so MongoDB is just asked to fetch the underlying data.

collection.find( 
  { },
  { "category" : 1 , "quantity" : 1 , "_id" : 0 }
)

Scope for Improvement

A number of changes either in NSMC's implementation or in Spark SQL's implementation could result in still more efficient processing of queries to MongoDB.

Sampling

Scanning the entire collection to compute a schema has the advantage of not missing documents that would have added valuable schema information. On the other hand, it can be very expensive. Sampling clearly has to be optional, with a parameter allowing the user to specify how much of the collection to sample.

Narrower Schema Inference

Currently, a schema is always inferred for the entire collection, which can be expensive if queries will only use part of the collection. An obvious refinement would be to allow users to specify a projection when registering the collection. Being able to specify a filter would be even more powerful, but less obviously useful.

Caching

The inferred schema is cached for use by all queries. This has its dangers, which are mitigated by Spark SQL's ability to refresh a registered collection.

Since the results of a query conform to all the standard characteristics of an RDD, they benefit from Spark's caching capabilities. However, it is not yet clear to me how much they benefit across multiple queries. This needs some investigation and experimentation.

Tighter Integration with Catalyst

Spark SQL's external data source API was designed with simplicity and ease of use in mind, and in my opinion it succeeds admirably in this regard. However, the API does provide for deeper (and harder to use) integration with Spark SQL's Catalyst query compiler. It may in the future be possible to improve NSMC's Spark SQL integration by using this feature.

Limitations of the External Data Source API

Spark SQL's external data source API basically can't handle nested structures or arrays. This means certain queries against a MongoDB collection will always require that too much data be returned from MongoDB to Spark. I don't know if this is at all on the radar of the Spark SQL developers.

Additionally, the external data source API does not support pushing joins or grouping and aggregation to the external database.

Integrating with Spark SQL's cost model

The external data source API allows an implementation to provide information about the size of a collection, but NSMC doesn't use this feature. It may allow the Catalyst query optimizer to make better query processing decisions.

Limitations of Catalyst

The Catalyst query optimizer currently has a limited cost model, and improvements in it may be helpful to NSMC's performance.

Improved Partitioning

NSMC's collection partitioning features are currently quite naive, and this may limit the efficiency gains of parallel schema inference and parallel query execution. This needs some experimentation.

Summary

In my original post about NSMC's Spark SQL integration, I described a very naive implementation, intended to show that such integration was possible and could be useful. In this post I've described more recent work to take advantage of Spark's parallelism for schema inference, to prevent schema inference from consuming large amounts of memory, and to take advantage of Spark SQL's filter and projection pushdown features to dramatically improve query performance in many cases. While there's a lot more to be done, these improvements in many cases give NSMC's Spark SQL integration quite realistic performance.

Thursday, May 14, 2015

JDBC Access to MongoDB via Apache Spark

In previous posts I've discussed a native Apache Spark connector for MongoDB (NSMC) and NSMC's integration with Spark SQL. The latter post described an example project that issued Spark SQL queries via Scala code. However, much of the value of Spark SQL integration comes from the possibility of it being used either by pre-existing tools or applications, or by end users who understand SQL but do not, in general, write code. In this post I'll describe how to set up a Spark Thrift server with NSMC, and query it via JDBC calls.

JDBC access to Spark SQL

To make JDBC connections to Spark SQL, you need to run the Spark Thrift server, for which I'll give instructions below. Writing a Java application that connects to the Thrift server requires the HiveServer2 JDBC connector. The link provides some general information together with a list of JARs needed in the CLASSPATH, but I prefer to use Maven dependencies. The example described here works with the following quite conservative dependencies:

<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-jdbc</artifactId>
  <version>0.13.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.2.0</version>
</dependency>

If you prefer to live on the edge, the following dependencies work too:

<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-jdbc</artifactId>
  <version>1.1.0</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.7.0</version>
</dependency>

Running a Spark Thrift server with NSMC

Let's begin by setting up a Spark Thrift server that has access to NSMC.

Prerequisites

In order to follow along with the setup and examples you will need to set up the following:

  1. A running instance of MongoDB.
  2. A running instance of Spark 1.3.1 or later that can communicate with the MongoDB instance.
  3. The NSMC Examples project, having run the main() method of the PopulateTestCollection class to set up the example collection in your MongoDB instance. (While the rest of the example classes need to be submitted as a Spark job, PopulateTestCollection connects directly to your MongoDB instance, and can be run stand-alone.)
  4. The NSMC JDBC Client Samples project, which I'll describe in some detail below.
  5. A configuration file that will be used to tell NSMC how to connect to your MongoDB instance -- we'll call it nsmc.conf.
The required contents of your nsmc.conf file are as follows:
spark.nsmc.connection.host      <your MongoDB host>
spark.nsmc.connection.port      <your MongoDB port>
# omit the next two lines if you're not using MongoDB authentication, 
# otherwise provide an appropriate username and password
spark.nsmc.user                 <your MongoDB user name>
spark.nsmc.password             <your MongoDB password>

Startup

Next you need to start a Spark Thrift server to act as a JDBC listener. You need to tell it:
  1. Where to find your Spark master.
  2. What packages it needs to download from the Maven Central Repository -- in this case a supported version of NSMC, v0.5.2 or later, compiled for Scala 2.10. (Earlier versions of NSMC cannot be used in this way.)
  3. Where to find NSMC configuration -- in this case your nsmc.conf file.
<your Spark installation>/sbin/start-thriftserver.sh \
    --master <your spark master URL> \
    --packages com.github.spirom:spark-mongodb-connector_2.10:0.5.2 \
    --properties-file <path to config files>/nsmc.conf

If you do not want Spark to download NSMC directly from the Maven Central Repository, perhaps because the servers do not have access to the Internet, you can download an assembly containing all the relevant packages -- look for spark-mongodb-connector-assembly-0.5.2.jar or a later version. If you take this approach you need to:

  • Save the assembly somewhere that is accessible on all servers in your Spark installation
  • Skip the --packages setting when starting the Thrift server and instead use --driver-class-path together with the path to where you saved the assembly.
  • Add an entry to your nsmc.conf file where the key is spark.executor.extraClassPath and the value is again the path to where you saved the assembly file.

Running the JDBC Examples

If you have cloned the NSMC JDBC Client Samples project, you can build and run it either through IntelliJ Idea, or through Apache Maven by using the commands:

mvn compile
mvn exec:java -Dexec.mainClass="Demo"

Most of the code is fairly mundane JDBC, and I won't attempt to write a JDBC tutorial. But I will step through some of the most interesting points. You may need to modify the connection string if you're running the Spark Thrift server on a remote host or a non-default port. Furthermore, if you've set up authentication you'll need to provide an appropriate user name and password.

Connection con = 
    DriverManager.getConnection("jdbc:hive2://localhost:10000/default", 
                                "hive", "");

The most important part of the example is registering a MongoDB collection with Spark SQL. There are a number of interesting aspects.

  • You need to register a temporary table, and thus you will need to re-register it if you restart the Spark Thrift server. Spark (as of 1.3.0) now supports persisting metadata for external resources like this in the Hive Metastore, but NSMC (as of v0.5.2) isn't yet capable of taking advantage of it.
  • Because all connections to a Spark Thrift server run in the same HiveContext, registering a MongoDB collection once like this makes it available to all users until the server is restarted.
  • It is the USING clause that tells Spark SQL to use NSMC for querying this resource. Since you told the Spark Thrift server to download an appropriate version of NSMC from the Maven Central Repository, NSMC will both set up the right metadata when a collection is registered, and then also process any queries to this table.
  • You can register several different MongoDB collections, potentially in different databases, through multiple invocations of CREATE TEMPORARY TABLE. Alas, all connections must use the same MongoDB endpoint (host/port) with the same NSMC configuration. See NSMC Issue #15 for the status of the needed enhancement.
CREATE TEMPORARY TABLE dataTable
USING nsmc.sql.MongoRelationProvider
OPTIONS (db 'test', collection 'scratch')

The remaining code executes some simple DDL and DML queries, and should be clear if you've looked at the MongoDB collection and queries in the NSMC Examples project. You will recognize the following Spark SQL schema as being the schema that NSMC infers from the MongoDB collection.

root
 |-- _id: string (nullable = true)
 |-- billingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- custid: string (nullable = true)
 |-- discountCode: integer (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemid: string (nullable = true)
 |    |    |-- orderid: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |-- shippingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)

Since this schema has both arrays and nested structures, I want to spend the rest of this post focusing on how JDBC handles returning query results from such an aggressively non-rectangular (or non-relational) schema. As you have seen in my original Spark SQL post, Spark SQL itself deals with this extraordinarily well, but alas the Hive JDBC connector does not.

The following simple query illustrates the issues, since billingAddress contains a nested document and orders contains an array.

SELECT custid, billingAddress, orders FROM dataTable

If you execute the query from Scala and print the schema of the result you will see:

root
 |-- custid: string (nullable = true)
 |-- billingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemid: string (nullable = true)
 |    |    |-- orderid: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)

And printing the data frame yields:

custid billingAddress orders               
1001   [NV,89150]     List([A001,100000... 
1002   [CA,92093]     List([B012,100000... 
1003   [AZ,85014]     List([A001,100000... 
1004   null           null                 

In JDBC we would expect the billingAddress to be modeled as java.sql.Struct and the orders to be a java.sql.Array (where the elements are also java.sql.Struct). Instead, we see that JDBC recognizes the original types, but transmits all the structured values as JSON strings. The following code can be used to show how JDBC perceives the column types:

ResultSetMetaData rsm = res.getMetaData();
System.out.println("*** Column metadata:");
int ncols = rsm.getColumnCount();
System.out.println("total of " + ncols + " columns");
for (int i = 1; i <= ncols; i++) {
    System.out.println("Column " + i + " : " + rsm.getColumnName(i));
    System.out.println(" Label : " + rsm.getColumnLabel(i));
    System.out.println(" Class Name : " + rsm.getColumnClassName(i));
    System.out.println(" Type : " + rsm.getColumnType(i));
    System.out.println(" Type Name : " + rsm.getColumnTypeName(i));
}

Here is the output:

*** Column metadata:
total of 3 columns
Column 1 : custid
  Label : custid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 2 : billingAddress
  Label : billingAddress
  Class Name : java.lang.String
  Type : 2002
  Type Name : struct
Column 3 : orders
  Label : orders
  Class Name : java.lang.String
  Type : 2003
  Type Name : array

The following code allows us to see the values. Since all of the columns have been converted to strings, it's worth noting that the highlighted code could just as well have been replaced with res.getString(i).

System.out.println("*** Data:");
while (res.next()) {
    System.out.println("Row " + res.getRow());
    for (int i = 1; i <= ncols; i++) {
        System.out.println(" Column " + i + " : " + res.getObject(i));
    }
}

Here is the output. Notice that it is quite readable, and the JSON strings could be parsed by your client code if needed, especially since you can use the ResultSetMetaData to determine what to expect in the String. Whether this is "good enough" really depends on your needs.

*** Data:
Row 1
  Column 1 : 1001
  Column 2 : {"state":"NV","zip":"89150"}
  Column 3 : [{"itemid":"A001","orderid":"1000001","quantity":175},
              {"itemid":"A002","orderid":"1000002","quantity":20}]
Row 2
  Column 1 : 1002
  Column 2 : {"state":"CA","zip":"92093"}
  Column 3 : [{"itemid":"B012","orderid":"1000002","quantity":200}]
Row 3
  Column 1 : 1003
  Column 2 : {"state":"AZ","zip":"85014"}
  Column 3 : [{"itemid":"A001","orderid":"1000003","quantity":175},
              {"itemid":"B001","orderid":"1000004","quantity":10},
              {"itemid":"A060","orderid":"1000005","quantity":12}]
Row 4
  Column 1 : 1004
  Column 2 : null
  Column 3 : null

Advantages of using HiveQL in queries

If Hive's compromise for returning semi-structured data doesn't meet your needs, you may want to consider doing more of the work in the query. For example, Spark SQL supports drilling into nested structures as follows:

SELECT custid, billingAddress.zip FROM dataTable

This actually works remarkably well. For example, if billingAddress happens to be null in some document, asking for billingAddress.zip also returns NULL, instead of generating an error. However, Spark SQL doesn't give you a good way to unpack arrays -- and yet there is a coping strategy for those too.

Requests to the Thrift JDBC server are executed in a HiveContext, so in addition to having access to Spark SQL as a query language, you can write queries in HiveQL. More than just being convenient for experienced Hive users, this allows you to work around some of the shortcomings of the Hive JDBC driver for dealing with the "non-rectangular" aspects of MongoDB collections. For example, the "LATERAL VIEW" feature of HiveQL allows you to effectively "denormalize" the data your query returns:

SELECT custid, o.orderid, o.itemid, o.quantity 
FROM dataTable LATERAL VIEW explode(orders) t AS o

Instead of returning one result row for each matching document, it returns one row for each order, with the custid joined on. The result is completely rectangular, and so is easy to deal with in your client code. First, you can see this in the "Type Name" of the metadata.

*** Column metadata:
total of 4 columns
Column 1 : custid
  Label : custid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 2 : orderid
  Label : orderid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 3 : itemid
  Label : itemid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 4 : quantity
  Label : quantity
  Class Name : java.lang.Integer
  Type : 4
  Type Name : int

Second, you can see it when you print the results.

*** Data:
Row 1
  Column 1 : 1001
  Column 2 : 1000001
  Column 3 : A001
  Column 4 : 175
Row 2
  Column 1 : 1001
  Column 2 : 1000002
  Column 3 : A002
  Column 4 : 20
Row 3
  Column 1 : 1002
  Column 2 : 1000002
  Column 3 : B012
  Column 4 : 200
Row 4
  Column 1 : 1003
  Column 2 : 1000003
  Column 3 : A001
  Column 4 : 175
Row 5
  Column 1 : 1003
  Column 2 : 1000004
  Column 3 : B001
  Column 4 : 10
Row 6
  Column 1 : 1003
  Column 2 : 1000005
  Column 3 : A060
  Column 4 : 12

You can get more information about this technique in the relevant section of the Hive Language Manual.

Summary

Using Apache Spark's Thrift server and NSMC's integration with Spark SQL, you can run highly distributed SQL queries against MongoDB collections, perhaps even performing joins between MongoDB collections and data stored elsewhere.

Wednesday, February 18, 2015

Spark SQL Integration for MongoDB

In a previous post I described a native Spark connector for MongoDB (NSMC) and showed how to use it. That post described how to create RDDs where each of the elements were MongoDB's DBObject records. This time, I'll describe how the connector integrates with Spark SQL via Spark 1.2.0's external data source API. Since Spark SQL incorporates support for both nested record structures and arrays, it is naturally a good match for the rather free wheeling schema of MongoDB collections. As before you can find the code on GitHub, use the library in your Scala code via sbt, and look at usage examples in a separate GitHub project. Note that Spark SQL integration was introduced, together with support for Spark 1.2.0, in release 0.4.0 of NSMC.

Related posts

A number of followup posts take this work further:

  1. In Efficient Spark SQL Queries to MongoDB I describe how to make the Spark SQL integration more efficient, and
  2. in JDBC Access to MongoDB via Apache Spark I describe how to use NSMC's Spark SQL integration via JDBC.

Querying JSON from Spark SQL

Many of the complexities of querying MongoDB from Spark SQL are similar to the complexities of querying JSON from Spark SQL, a capabitity that Spark SQL provides out of the box, so let's begin by taking some time to understand that mechanism. The main source of difficulty is that, on the one hand, MongoDB collections do not have a schema (or they have a de facto schema based on their current contents) while on the other hand Spark SQL depends on infering a well defined schema for the data being queried. More specifically:

  1. Different documents in a JSON collection frequently have differing structure, at the top level or in nested documents.
  2. Document fields can have array types, but elements of the same array may have different structures.
  3. The simplest differences in document structure are when one or more fields are present in some documents or array members but absent in others -- these are reconciled by treating the field as nullable.
  4. Atomic type conflicts can be easy to reconcile. For example, when a field is an Int32 in one document and an Int64 in another, it's simple enough to "relax" the type to Int64. It's a little more problematic when one instance is a string and the other is an integer. But the most problematic conflicts are structural: a scalar vs. array, scalar vs. document, or document vs. array.

Keeping this in mind, let's look at how Spark SQL's JSON support deals with infering schema, in a range of simple and not-so-simple examples. You can see the sample code for these at the LearningSpark project on GitHub. The first one is very straightforward, with scalars, nested documents, an array of scalars and an array of documents. It might be tempting to conclude that with a single document there are no opportunities for conflicts, but that's not true: multiple elements of a single array can conflict with each other -- although in this case they don't.

{"a": 1, 
 "b": 2, 
 "c": ["x", "y"], 
 "d": [{"e": 1, "f": 2}, {"e": 11, "f": 12}], 
 "g": {"h": 7}}   

There shouldn't be any surprises in the schema, except perhaps for the fact that Spark SQL makes the fields all nullable, perhaps needlessly in this simple case.

root
 |-- a: integer (nullable = true)
 |-- b: integer (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- d: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- e: integer (nullable = true)
 |    |    |-- f: integer (nullable = true)
 |-- g: struct (nullable = true)
 |    |-- h: integer (nullable = true)

The next example introduces minor conflicts by adding a second document and leaving out certain fields -- "b" and "g" only appear in the first document, and "d.u" only appears in the second document.

{"a": 1, 
 "b": 2, 
 "c": ["x", "y"], 
 "d": [{"e": 1, "f": 2}, {"e": 11, "f": 12}], 
 "g": {"h": 7}}
{"a": 1, 
 "c": ["x", "y"], 
 "d": [{"e": 1, "f": 2, "u":"ewe"}, {"e": 11, "f": 12}]}

The schema is simply the "union" of the two schemas. That's OK since each field is nullable.

root
 |-- a: integer (nullable = true)
 |-- b: integer (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- d: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- e: integer (nullable = true)
 |    |    |-- f: integer (nullable = true)
 |    |    |-- u: string (nullable = true)
 |-- g: struct (nullable = true)
 |    |-- h: integer (nullable = true)

The last example introduces more interesting conflicts. Field "a" is an integer in one document and a long in the other, "b" is an integer in one and an array in the other, "c" is an array in one and a scalar in the other, "d" is a document in one and an array of documents in the other, and g is a document in one and an array of integers in the other.

{"a": 1, 
 "b": 2, 
 "c": ["x", "y"], 
 "d": {"e": 1, "f": 2}, 
 "g": {"h": 7}}
{"a": 5000000000, 
 "b": "two", 
 "c": 12, 
 "d": [{"e": 1, "f": 2, "u":"ewe"}, {"e": 11, "f": 12}], 
 "g": [1, 2]}

Looking at the inferred schema, the treatment of "a" is quite simple, since an integer can be stored in a long. Every other conflict is dealt with by treating the field as a string.

root
 |-- a: long (nullable = true)
 |-- b: string (nullable = true)
 |-- c: string (nullable = true)
 |-- d: string (nullable = true)
 |-- g: string (nullable = true)

This "reversion" to strings has some interesting consequences. The fields can be used in a query, say in a SELECT clause, so you can obtain and manipulate the values. The values returned, however, are NOT strings. For example, if you SELECT g you will obtain a Row containing a single integer from the first document and a list of integers from the other. And, most importantly, you can't query sub-fields -- so you can't SELECT g.h or SELECT c[1], either of which would have been fine against the previous schema. I'm willing to accept this as a good set of compromises, and having NSMC's schema inference behave analogously seems entirely appropriate.

Integrating NSMC with Spark SQL

In two earlier posts I described how to integrate an external database engine with Spark SQL using the external data source API introduced in Spark 1.2.0. The first post described a simple and fairly naive integration approach where the external engine returned all the data in a table or collection, and Spark SQL took care of all filtering and projection. The second post showed how to push filtering and projection down to the external database engine. However, the second post also showed how the Spark SQL query compiler makes limited use of projection pushdown for queries against non-rectangular data. Furthermore, NSMC currently has no ability to push filtering out to MongoDB. As a result, I've opted for the simpler approach thus far. At some point it will make sense to revisit this decision in order to make the connector more efficient.

Schema Inference Issues

Spark SQL integration raised a number of design issues. Some of them may deserve their own posts in the future, but for now here's a summary of what the issues are and how I've resolved them (or, in many cases, found a "stop gap" solution).

  1. Sampling issues: reading an entire data collection to infer a schema is expensive, although sometimes necessary. There are a number of interesting decisions to make in this regard, and in a future release some or all of them may become configurable.
    • How many documents are sampled: currently it's all of them.
    • When to sample: currently they are sampled immediately when the temporary table is registered with Spark SQL.
    • How often to sample: currently they are only sampled once on registration, and then several queries may execute against the result.
  2. Type mapping and coverage: MongoDB supports lots of built in types. For now, NSMC's Spark SQL integration supports only integers, longs, strings, booleans, doubles, binary, time stamps and dates, as well as arrays and nested documents.
  3. Structure field ordering and merging: while the fields of JSON documents are nominally unordered, NSMC sorts them by name, as does Spark SQL's built in JSON support.
  4. Handling atomic or structural type conflicts: these are not currently resolved by NSMC.

Configuration and Use

Perhaps the best way to learn how to use NSMC's Spark SQL integration is to look at the spark-mongodb-examples project on GitHub. The README.md there describes how to set up a sample MongoDB collection and the SQLQuery.scala file shows how to configure an appropriate SQLContext, register a MongoDB collection as a temporary table with Spark SQL, and then execute Spark SQL queries against it. We'll review the example code below.

In your build.sbt you'll need the following dependencies.

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.2.0"

libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.2.0"

libraryDependencies += 
    "com.github.spirom" %% "spark-mongodb-connector" % "0.4.0"

Only a few imports are needed.

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}   
  

The SparkContext is configured in the same way as when using the core connector, and then a SQLContext is wrapped around it.

val coreConf =
  new SparkConf()
    .setAppName("MongoReader").setMaster("local[4]")
    .set("nsmc.connection.host", DBConfig.host)
    .set("nsmc.connection.port", DBConfig.port)
val conf = (DBConfig.userName, DBConfig.password) match {
  case (Some(u), Some(pw)) => 
    coreConf.set("nsmc.user", u).set("nsmc.password", pw)
  case _ => coreConf
}
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

The MongoDB collection is registered as a temporary table -- you'll need to change the table name as well as the MongoDB database and collection names in your applications, but note that you must tell Spark SQL that you're registering the table via NSMC by referring to nsmc.sql.MongoRelationProvider.

sqlContext.sql(
  s"""
  |CREATE TEMPORARY TABLE dataTable
  |USING nsmc.sql.MongoRelationProvider
  |OPTIONS (db '${DBConfig.testDatabase}', 
  |         collection '${DBConfig.testCollection}')
""".stripMargin)

Now you can run your first query. It's useful to make the first one a SELECT * query so that you can see the entire Spark SQL schema that has been inferred for the sample collection.

val data =
  sqlContext.sql("SELECT * FROM dataTable")

We'll look at the schemas for the results of the sample queries in the next section. You can print the schema by first obtaining it via SchemaRDD's schema method, and then calling printTreeString().

data.schema.printTreeString()

Finally, you can access the actual data returned from the query.

data.foreach(println)

Sample Data

Before we look at the four queries provided in the sample code, let's see what data we're querying from the MongoDB perspective using JSON notation. This is the data inserted by running the main() method of the PopulateTestCollection object in the sample project.

{ 
    "_id" : ObjectId("54e1586080ce44e72e07fe4c"), 
    "custid" : "1001", 
    "billingAddress" : {
        "state" : "NV"
    }, 
    "orders" : [
        {
            "orderid" : "1000001", 
            "itemid" : "A001", 
            "quantity" : NumberInt(175), 
            "returned" : true
        }, 
        {
            "orderid" : "1000002", 
            "itemid" : "A002", 
            "quantity" : NumberInt(20)
        }
    ]
}
{ 
    "_id" : ObjectId("54e1586080ce44e72e07fe4d"), 
    "custid" : "1002", 
    "billingAddress" : {
        "state" : "CA", 
        "zip" : "92093"
    }, 
    "shippingAddress" : {
        "state" : "CA", 
        "zip" : "92109"
    }, 
    "orders" : [
        {
            "orderid" : "1000002", 
            "itemid" : "B012", 
            "quantity" : NumberInt(200)
        }
    ]
}
{ 
    "_id" : ObjectId("54e1586080ce44e72e07fe4e"), 
    "custid" : "1003", 
    "billingAddress" : {
        "state" : "AZ"
    }, 
    "discountCode" : NumberInt(1), 
    "orders" : [
        {
            "orderid" : "1000003", 
            "itemid" : "A001", 
            "quantity" : NumberInt(175)
        }, 
        {
            "orderid" : "1000004", 
            "itemid" : "B001", 
            "quantity" : NumberInt(10)
        }, 
        {
            "orderid" : "1000005", 
            "itemid" : "A060", 
            "quantity" : NumberInt(12)
        }
    ]
}

Notice how "non-rectangular" this data is. Entire nested documents are missing in some documents, scalar fields are missing in others, and the arrays are of different lengths.

Example queries

Now let's look at the queries in the sample project. To see how NSMC has interpreted the sample collection, it's useful to start with a query that returns all of it.

 SELECT * FROM dataTable

As you might hope, the inferred schema is general enough to account for every document in the collection. This is achieved by making every field nullable -- regardless of whether it is missing from any document. At the level of each document or nested document, the fields are in alphabetical order. The types chosen are not surprising, except perhaps for the use of a string to represent the unique ID of each document.

root
 |-- _id: string (nullable = true)
 |-- billingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- custid: string (nullable = true)
 |-- discountCode: integer (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemid: string (nullable = true)
 |    |    |-- orderid: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |-- shippingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)

A couple of things are worth noticing at this stage.

  1. The data returned shows the missing fields as Scala nulls, which are considered by Spark SQL as NULLs, and soon we'll see how to exploit the latter fact in queries.
  2. Each Spark SQL result row is represented using an instance of org.apache.spark.sql.Row, and each nested document by a nested instance of Row. When these are printed they are surrounded by square brackets.
  3. The fields of documents and sub-documents are in the same order as in the inferred schema.
  4. It's hard to tell from this output, but atomic data has the appropriate atomic type: Int, String and so on.
[54e1621480ce1641f05b1caf,
    [NV,89150],
    1001,
    null,
    List([A001,1000001,175], [A002,1000002,20]),
    null]
[54e1621480ce1641f05b1cb0,
    [CA,92093],
    1002,null,
    List([B012,1000002,200]),
    [CA,92109]]
[54e1621480ce1641f05b1cb1,
    [AZ,85014],
    1003,
    1,
    List([A001,1000003,175], [B001,1000004,10], [A060,1000005,12]),
    [AZ,85020]]
[54e1621480ce1641f05b1cb2,
    null,
    1004,
    null,
    null,
    [CA,92109]]

Since the discountCode is often missing, we can see that Spark SQL considers the missing instances to be NULL.

 SELECT custid FROM dataTable 
 WHERE discountCode IS NOT NULL

This query just returns a single column.

root
 |-- custid: string (nullable = true)
  

Only one document has an instance of discountCode.

[1003]

Since the data contains entire sub-documents that are missing, we may be curious about what happens when we use a field inside a sub-document in a query.

 SELECT custid, shippingAddress.zip 
 FROM dataTable

We expect two columns. Notice how the schema shows zip to have been pulled up to the top level.

root
 |-- custid: string (nullable = true)
 |-- zip: string (nullable = true)
  

It turns out not to be a problem: the fields within the sub-document are considered to be NULL when either they are missing or when the entire sub-document is missing. This is also true when we filter by such fields or group by them. Again, zip is indeed at the top level, not nested, and that's what you would expect from the schema.

[1001,null]
[1002,92109]
[1003,85020]
[1004,92109]

Naturally, we'd also like grouping to play well with nested fields.

 SELECT COUNT(custid), shippingAddress.zip 
 FROM dataTable 
 GROUP BY shippingAddress.zip

The schema shouldn't be surprising at this point.

root
 |-- c0: long (nullable = false)
 |-- zip: string (nullable = true)

Neither should the results.

[2,92109]
[1,null]
[1,85020]

Implementation

In subsequent posts I'll describe the implementation of NSMC's Spark SQL integration in some detail. Meanwhile, you can see it on GitHub in the nsmc.sql and nsmc.conversion packages.

Performance and Scalability

In order to infer the schema for a collection, NSMC reads through the entire collection, as I've already described. It materializes the entire collection in a MongoRDD, which was a good way to get something working quickly, but is wasteful of memory, and often also of time. Release 0.4.0 is a "proof of concept" with respect to Spark SQL integration -- improved performance will have to wait for subsequent releases.

Your Experiences

If you use Spark and MongoDB, please try using NSMC -- but remember it's not ready for production use. If you have trouble, feel free to use the comments section here or file an issue on GitHub.

Sunday, January 25, 2015

NSMC: A Native MongoDB Connector for Apache Spark

Both core Spark and Spark SQL provide ways to neatly plug in external database engines as a source of data. In this post I'm going to describe an experimental MongoDB connector for core Spark, called NSMC (for "Native Spark MongoDB Connector"). It's a "native" connector in the sense that it connects Spark directly to MongoDB, without involving anything else like Hadoop. For me, this was largely an exercise in learning how to integrate an external data engine with core Spark, and more generally, an exercise in learning about Spark, but over time it became about a number of other things, including MongoDB, distributed databases and semi-structured data. I hope to delve into the various areas of learning in future posts, but here I'll simply introduce the project and describe how you can experiment with it.

MongoDB makes an interesting case study as an external Spark data source for a number of reasons:

  1. It has become the most popular DBMS in a number of "non-traditional" categories, including non-relational and NoSQL, being one of the few remaining systems where the "NoSQL" label still means something.
  2. The data model, based on collections of JSON-like documents, is both deeply ad-hoc (i.e., collections have no a priori schema whatsoever) and deeply non-rectangular, making it an interesting and (as it turns out) challenging test case for integration with virtually any system.
  3. It is not directly served by Spark, although it can be used from Spark via an official Hadoop connector, and Spark SQL also provides indirect support via its support for reading and writing JSON text files.

NSMC is hosted on GitHub under an Apache 2.0 license, but I'm not going to discuss the implementation in this post at all.

Related posts

A number of followup posts take this work further:
  1. In Spark SQL Integration for MongoDB I discuss how NSMC can now be used from Spark SQL.
  2. In Efficient Spark SQL Queries to MongoDB I describe how to make the Spark SQL integration more efficient, and
  3. in JDBC Access to MongoDB via Apache Spark I describe how to use NSMC's Spark SQL integration via JDBC.

Trying it out

To try the connector out in your system you need a Spark 1.1.0 instance and a MongoDB instance (clustered or not.) You can find a complete example to play with on GitHub. Your Spark code will need to be written in Scala, as part of an SBT project, and you need to include the following in your build.sbt file.

scalaVersion := "2.10.4" // any 2.10 is OK 

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.0"

libraryDependencies += "com.github.spirom" %% "spark-mongodb-connector" % "0.3.0"

Note that the connector itself depends on Spark 1.1.0 and Casbah. The connector API is in the nsmc namespace. You'll also need access to Casbah's import of DBObject class.

import nsmc._

import com.mongodb.casbah.Imports._

Then you need to configure a SparkContext in the usual way, and add some extra configuration to enable the connector to communicate with MongoDB. In this code snippet, and others below, the lines you need to add or change are highlighted.

val conf = new SparkConf()
    .setAppName("My MongoApp").setMaster("local[4]") // or whatever
    .set("nsmc.connection.host", "myMongoHost")
    .set("nsmc.connection.port", "myMongoPort")
    .set("nsmc.user", "yourUsernameHere")
    .set("nsmc.password", "yourPasswordHere")
val sc = newSparkContext(conf)

Finally, you can call the mongoCollection() method on the context with the names of your favourite MongoDB database and collection.

val data = sc.mongoCollection[DBObject]("myDB", "myCollection")

Then the value of data will be an RDD[DBObject], and you deal with it the way you would deal with any RDD, and with the elements like any Casbah DBObject. This means that you're unlikely to get very far unless you already know how to read from MongoDB using Casbah -- in fact this may be a good time to learn the Casbah API by writing a simple Scala example that doesn't involve Spark at all.

As you might expect if you are familiar with Spark RDDs, the above code doesn't actually load any data from MongoDB -- it's only when you use the RDD in a computation that its partitions are populated lazily.

Partitioning

By default, the RDD created has only one partition, which can create a performance bottleneck in the case of a large collection. If the collection you need to load is indexed in MongoDB, NSMC can ask MongoDB to tell it how to create partitions in a reasonable way. The following configuration will allow you to use that feature. Notice that you have to enable it and set the maximum size (in megabytes) of a Spark partition. You also have to say which indexed fields you want to partition on when you create the RDD.

val conf = new SparkConf()
    .setAppName("My MongoApp").setMaster("local[4]")
    .set("nsmc.connection.host", "myMongoHost")
    .set("nsmc.connection.port", "myMongoPort")
    .set("nsmc.user", "yourUsernameHere")
    .set("nsmc.password", "yourPasswordHere")
    .set("nsmc.split.indexed.collections", "true")
    .set("nsmc.split.chunk.size", "4")
val sc = new SparkContext(conf)
val data = sc.mongoCollection[DBObject]("myDB", "myCollection", Seq("key"))

If you have sharded collections, you can simply turn each shard into a Spark partition. You can even tell NSMC to bypass mongos and connect directly to the shards -- although this setting is best avoided unless you understand MongoDB sharding really well. The following snippet enables both of these features.

val conf = new SparkConf()
    .setAppName("My MongoApp").setMaster("local[4]")
    .set("nsmc.connection.host", "myMongoHost")
    .set("nsmc.connection.port", "myMongoPort")
    .set("nsmc.user", "yourUsernameHere")
    .set("nsmc.password", "yourPasswordHere")
    .set("nsmc.partition.on.shard.chunks", "true")
    .set("nsmc.direct.to.shards", "true")
val sc = new SparkContext(conf)
val data = sc.mongoCollection[DBObject]("myDB", "myCollection")

If you enable partitioning for both unsharded and sharded collections (and this may make sense if you will read from multiple collections), the shards take precedence for sharded collections. That is, a sharded collection will then always be partitioned according to its shards.

Things to note:

  1. All of the properties nsmc.split.indexed.collections, nsmc.partition.on.shard.chunks and nsmc.direct.to.shards default to false, which means that if you don't set them you'll get an unpartitioned RDD.
  2. These settings are global to a SparkContext, which may cause problems in some applications and perhaps provides an interesting design challenge for future versions of this connector.
  3. The setting of nsmc.direct.to.shards is only used if you set nsmc.partition.on.shard.chunks to true

Configuration overview

Configuration for the connector is picked up from your SparkContext. Here is an overview of all the relevant settings.

SettingMeaningUnitsDefault
nsmc.connection.hostMongoDB host or IP addresslocalhost
nsmc.connection.portMongoDB port27017
nsmc.userMongoDB user nameno authentication
nsmc.passwordMongoDB passwordno authentication
nsmc.split.indexed.collectionsShould indexed collections be partitioned using MongoDB's [internal] splitVector command?booleanfalse
nsmc.split.chunk.sizeMaximum chunk size, in megabytes, passed to MongoDB's splitVector command, if used.MB4
nsmc.partition.on.shard.chunksShould collections that are already sharded in MongoDB retain this as their partitioning in Spark? If not, the entire collection will be read as a single Spark partition. booleanfalse
nsmc.direct.to.shardsIf sharding of collections is being observed, should the mongos server be bypassed? (Don't do this unless you understand MongoDB really well, or you may obtain incorrect results -- if MongoDB is rebalancing the shards when your query executes.).booleanfalse

Limitations of the connector

NSMC is strictly experimental, and not suitable for production use. Use it if you have a strong stomach and enjoy experimenting. You can get an overview of its current limitations at any time by checking the Issues page on GitHub but because it's so important to realize just how experimental the connector is currently, I'll list the most important limitations.

  • While, in spirit, NSMC is similar to MongoDB's Hadoop connector, it is much less sophisticated and less tested.
  • There are no Java or Python APIs: it's Scala only
  • There's no integration with Spark SQL
  • Writing data to MongoDB is not supported
  • You can't get MongoDB to filter the collection before loading it into Spark -- the entire collection is loaded, no matter how large.
  • There's no way to take advantage of MongoDB replication
  • Advanced mongoDB authentication is not supported
  • Neither Spark 1.2.0 nor Scala 2.11 are supported
  • Some of the MongoDB commands used in the implementation, also used in the MongoDB connector for Hadoop, are not really public interfaces.

My limitations

I hope people will try the connector out and share their experiences, but I won't actually be able to give anything approaching professional support: this is a self-education project and I'm doing it in my spare time. If the project turns out to be broadly useful, I'm also happy to help turn it into a collaborative one, or to get it incorporated into a larger pre-existing project. I haven't discussed it with the developers of MongoDB, and don't know whether they're at all interested in building a native Spark connector.

It's also important to understand that I don't have a large MongoDB cluster and I'm not going to build one. I've taken some care to get decent logging in place right from the start, and will be happy to look at log files.

What's Next?

As there are a lot of interesting and potentially difficult issues to address before the connector can be considered complete, in future posts (and code commits) I'll address design issues and implementation choices.

Sunday, December 28, 2014

Filtering and Projection in Spark SQL External Data Sources

In the previous post about the Spark external data source API, we looked at a simple way to register an external data source as a temporary table in Spark SQL. While very easy to use, that mechanism didn't allow Spark SQL to specify which data columns it was interested in, or to provide filters on the rows: the external data source had to produce all the data it had, and Spark SQL would filter the result records as needed. That's often very inefficient, especially when large amounts of data are involved. In this post we'll look at parts of the external data source API that solve this problem, allowing Spark SQL to push projection and filtering down to the external data source.

As with other posts in this series, I'm assuming familiarity with Scala, and the code for the examples can be found at https://github.com/spirom/LearningSpark -- in this case sql/RelationProviderFilterPushdown.scala.

As we discussed last time, the external data source API is located in the org.apache.spark.sql package, and consists of six abstract classes, six case classes and one trait. Projection pushdown is supported by the abstract class PrunedScan, but we'll cover its capabilities by discussing the more general abstract class PrunedFilteredScan, which also supports filter pushdown.

Much of this post is about how the external data source API gives you flexibility in deciding which of three layers of software filters the data that's being queried, so we need to agree on names for the layers. I'm going to name them as follows:

  1. External Data Engine: Some piece of (probably third party) software whose data you you want to be able to query from Spark SQL. While you may be willing to make changes to this software to make this easier, I'm going to assume that it has a fixed set of capabilities that will influence some of the choices you make in implementing an adapter for it.
  2. External Data Source Adapter: This is the code we're learning how to implement in this post -- an implementation of Spark SQL's external data source API that accepts calls from Spark SQL and turns them into calls to an external data engine, and then filters and transforms the results before returning them to Spark SQL across the API. When implementing the adapter, you can choose between three strategies: a simple one based on the TableScan abstract class, as discussed in the previous post, an intermediate one based on the PrunedScan abstract class, or the most powerful (and complex) one, PrunedFilteredScan, which is the topic of this post.
  3. Spark SQL: This is Spark's SQL query compiler, optimizer and top level execution engine, layered on top of core Spark. Individual queries can combine internal and external data, and even external data from multiple sources, and Spark SQL must cleanly break out sub-queries to the individual external data source adapters. When querying external data it has a number of choices in exactly what it requests from the external data source adapter. It detects which of the three implementation strategies is being used for any given adapter, and conforms to it, but the adapter needs to support all the possibilities for its chosen strategy.

In case you're coding along, you'll need the following imports, but remember that I gave a link to the complete code at the beginning of this post.

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.sources._

How to deal with filter "pushdown"

Filter "pushdown" is the general approach used by most database query systems for doing filtering as close as possible to the source of the data, based on the assumption that dealing with less data will almost always be faster. That isn't always true, especially if the filters are complex to evaluate, but that's not our problem here. Spark SQL's external data source API provides SQL the option of pushing a set of size simple filters down through the API, leaving the external data source adapter three options for utilizing those filters:

  1. Keep pushing: If the external data engine has a powerful query language such as SQL, MDX or XQuery, the most efficient implementation of all may result from the filters being used to construct the query that's sent to that system.
  2. Evaluate: Don't push the filter through into the external data engine but instead interpret it and use it to filter the records returned, rather than just blindly returning all records to Spark SQL.
  3. Do nothing: Spark SQL doesn't know which filters you've utilized, and doesn't really care -- it reapplies all of them to the records your adapter returns to make absolutely sure that no spurious records get through.

Two pragmatic points are worth making here. First, any particular implementation of an external data source can start out being naive, perhaps ignoring most filters and pushing some critical ones through to the external database, and over time optimizations can be introduced when needed. This makes the external data source API particularly easy to adopt while also being quite powerful. Second, the decision of to what to do with a set of filters passed by Spark SQL can be made by the implementation on a per-query, per-filter basis: for certain queries all filters may be ignored (perhaps because they are not critical to performance or they are hard to deal with), while for other queries either some or all filters can be used.

Finally, we should list the six kinds of filters that the external data source API supports:

Case ClassFilter
EqualToattr = const
GreaterThanattr > const
LessThanattr < const
GreaterThanOrEqualattr >= const
LessThanOrEqualattr <= const
Inattr = const_1 or ... or attr = const_n

A back-end data engine with an impoverished query "language"

The power and flexibility of the external data source API is best illustrated with an adapter for an external data engine that provides some significant query capability, but in most requests is rather naive. We'll simulate this with the extremely simple RangeDB class below, which represents a database system with the following characteristics:

  1. Tables have a fixed number of records numRecords.
  2. The records have an integer primary key from 1 to numRecords, and two additional integer fields squared and cubed.
  3. The only kind of query supported (via the getRecords() method) is an inclusive range query on the primary key, where the bounds are optional. This returns an iterator on the native records of the database.
case class RangeDBRecord(key: Int, squared: Int, cubed: Int)

class RangeIterator(begin: Int, end: Int) extends Iterator[RangeDBRecord] {
  var pos: Int = begin

  def hasNext: Boolean = pos <= end

  def next(): RangeDBRecord = {
    val rec = RangeDBRecord(pos, pos*pos, pos*pos*pos)
    pos = pos + 1
    rec
  }
}

class RangeDB(numRecords: Int) {

  def getRecords(min: Option[Int], max: Option[Int]): RangeIterator = {
    new RangeIterator(min.getOrElse(1), max.getOrElse(numRecords))
  }
}

The point of this unusual design is to strongly suggest that certain filters really should be pushed down into back end queries, while others absolutely cannot. It provides an informative twist because only four of the six kinds of filters can be passed through if they apply to the primary key column, but none can be passed through if they apply to other columns.

Wrangling filters

We organize all our filter handling into a single class called FilterInterpreter. This class needs to understand several things: the six kinds of queries that Spark SQL can pass through the API and what they mean, the characteristics of the records we are dealing with, represented for convenience as a Map, and finally the exact circumstances under which a filter can be pushed through to the external data engine.

When a FilterInterpreter is created it needs to group the filters by the attribute they are filtering, and the result is represented as a Map from attribute names to arrays of filters. Then the filters that we want to use for the primary key range query are pulled out, and a Map that represents all the remaining filters is produced.

class FilterInterpreter(allFilters: Array[Filter]) {

  private val allAttrToFilters: Map[String, Array[Filter]] = allFilters
    .map(f => (getFilterAttribute(f), f))
    .groupBy(attrFilter => attrFilter._1)
    .mapValues(a => a.map(p => p._2))

  val (min, max, otherKeyFilters) = splitKeyFilter

  private val attrToFilters = 
    allAttrToFilters - "val" + ("val" -> otherKeyFilters)

This relies on a utility method for figuring out which attribute a filter applies to:

  private def getFilterAttribute(f: Filter): String = {
    f match {
      case EqualTo(attr, v) => attr
      case GreaterThan(attr, v) => attr
      case LessThan(attr, v) => attr
      case GreaterThanOrEqual(attr, v) => attr
      case LessThanOrEqual(attr, v) => attr
      case In(attr, vs) => attr
    }
  }

We also need a utility method for splitting out the primary key range filter -- notice it knows the name of the primary key attribute, but it could easily be generalized. It also knows that only four of the six kinds of supported filters are relevant, and leaves the others alone.

  private def splitKeyFilter: (Option[Int], Option[Int], Array[Filter]) = {
    val keyFilters = allAttrToFilters.getOrElse("val", new Array[Filter](0))
    var min: Option[Int] = None
    var max: Option[Int] = None
    val others = new ArrayBuffer[Filter](0)
    keyFilters.foreach({
      case GreaterThan(attr, v) => min = Some(v.asInstanceOf[Int] + 1)
      case LessThan(attr, v) => max = Some(v.asInstanceOf[Int] - 1)
      case GreaterThanOrEqual(attr, v) => min = Some(v.asInstanceOf[Int])
      case LessThanOrEqual(attr, v) => max = Some(v.asInstanceOf[Int])
      case _ => others.++=: _
    })
    (min, max, others.toArray)
  }

Finally, we need a way to apply the filters to a row, which relies on a helper method for applying the filters relevant to a single attribute to the value of that attribute.

  def apply(r: Map[String, Int]): Boolean = {
    r.forall({
      case (attr, v) => {
        val filters = attrToFilters.getOrElse(attr, new Array[Filter](0))
        satisfiesAll(v, filters)
      }
    })
  }

  private def satisfiesAll(value: Int, filters: Array[Filter]): Boolean = {
    filters.forall({
      case EqualTo(attr, v) => value == v.asInstanceOf[Int]
      case GreaterThan(attr, v) => value > v.asInstanceOf[Int]
      case LessThan(attr, v) => value < v.asInstanceOf[Int]
      case GreaterThanOrEqual(attr, v) => value >= v.asInstanceOf[Int]
      case LessThanOrEqual(attr, v) => value <= v.asInstanceOf[Int]
      case In(attr, vs) => vs.exists(v => value == v.asInstanceOf[Int])
    })
  }
}

Dealing with requiredColumns

Now that Spark SQL is able to specify which columns it wants returned, the situation is a little more tricky than it may seem at first. When buildScan was returning the entire Row, Spark SQL could use the information it obtained from calling schema to understand each Row. When it requests only a subset of columns, it's not only essential that we return only the data from the requested columns in each row, but also that we return them in the order in which they were requested, which may well be (and frequently is) different from the order in which those columns were returned from schema. You'll see a solution to this in the methods makeMap() and projectAndWrapRow() below.

Putting it all together

The core of our new implementation is the class MyPFTableScan which this time extends PrunedFilteredScan and wraps our primitive database RangeDB above:

case class MyPFTableScan(count: Int, partitions: Int)
                      (@transient val sqlContext: SQLContext)
  extends PrunedFilteredScan {

  val db = new RangeDB(count)

We specify the schema by overriding schema as before.

  val schema: StructType = StructType(Seq(
    StructField("val", IntegerType, nullable = false),
    StructField("squared", IntegerType, nullable = false),
    StructField("cubed", IntegerType, nullable = false)
  ))

Now we need two convenience methods for constructing result rows. The first takes a result and converts it to a more flexible representation based on a Map, and the second projects out unneeded columns and puts the required ones in the right order as specified by the requiredColumns parameter.

  private def makeMap(rec: RangeDBRecord): Map[String, Int] = {
    val m = new HashMap[String, Int]()
    m += ("val" -> rec.key)
    m += ("squared" -> rec.squared)
    m += ("cubed" -> rec.cubed)
    m.toMap
  }

  private def projectAndWrapRow(m: Map[String, Int],
                                requiredColumns: Array[String]): Row = {
    val l = requiredColumns.map(c => m(c))
    val r = Row.fromSeq(l)
    r
  }

Finally, we put it all together in our more complex override of the buildScan method where we instantiate our FilterInterpreter from the filters passed in by Spark SQL, use the primary key bounds pulled out of it to query our primitive data engine, and iterate through the records returned, dealing with each record by:

  1. Converting it to a Map
  2. Applying the remaining filters
  3. Projecting out unwanted columns and wrapping the remainder in the right order.

Then we turn all the columns into an RDD.

  def buildScan(requiredColumns: Array[String], 
                filters: Array[Filter]): RDD[Row] = {
    val filterInterpreter = new FilterInterpreter(filters)
    val rowIterator = 
      db.getRecords(filterInterpreter.min, filterInterpreter.max)
    val rows = rowIterator
      .map(rec => makeMap(rec))
      .filter(r => filterInterpreter.apply(r))
      .map(r => projectAndWrapRow(r, requiredColumns))
    sqlContext.sparkContext.parallelize(rows.toSeq, partitions)
  }

}

As before, we need to extend the RelationProvider trait to produce the class we'll actually register with Spark SQL.

class CustomPFRP extends RelationProvider {

  def createRelation(sqlContext: SQLContext, 
                     parameters: Map[String, String]) = {
    MyPFTableScan(parameters("rows").toInt,
      parameters("partitions").toInt)(sqlContext)
  }

}

Registration

The process for registering our external data source is also the same as it was in the previous post.

 sqlContext.sql(
      s"""
        |CREATE TEMPORARY TABLE dataTable
        |USING sql.CustomPFRP
        |OPTIONS (partitions '9', rows '50')
      """.stripMargin)

Querying

Here are a few queries that exercise the new external data source. I recommend stepping through the code we've discussed using a debugger to see how Spark SQL actually calls it. If the parameters passed to buildScan() are surprising you should always remind yourself of the following:

  1. Spark SQL is not relying on the external data source adapter, or the external data engine to do the filtering, it is simply giving them the opportunity to do so. But that means it must request from the data source not only the data that must be returned by the query, but also all the data needed to evaluate the filters.
  2. Spark SQL is not obliged to pass in all the filters it could pass in.
  3. The external data source API allows Spark SQL to send a conjunction of simple filters. If the filter is not conjunctive, Spark SQL will have to evaluate all or most of it by itself.

The first example will request only two columns and pass in a single filter.

val data =
      sqlContext.sql(
        s"""
          |SELECT val, cubed
          |FROM dataTable
          |WHERE val <= 40 
          |ORDER BY val
        """.stripMargin)
    data.foreach(println)

This one will request three columns (so that Spark SQL can re-apply the filters) and pass in two filters.

val data =
      sqlContext.sql(
        s"""
          |SELECT val, cubed
          |FROM dataTable
          |WHERE val <= 40 AND squared >= 900
          |ORDER BY val
        """.stripMargin)
    data.foreach(println)

The third example involves a disjunctive filter, so Spark SQL will not pass any filters to the external data source adapter.

val data =
      sqlContext.sql(
        s"""
          |SELECT val, cubed
          |FROM dataTable
          |WHERE val <= 10 OR squared >= 900
          |ORDER BY val
        """.stripMargin)
    data.foreach(println)

Non-rectangular data

Since Spark SQL can handle non-rectangular data quite elegantly, it's natural to wonder about the extent of support for it in external data sources. For example, what if we wanted to collect the "squared" and "cubed" columns together into a structure called "data". It's pretty clear how we could define the schema in our PruneFilteredScan override:

val schema: StructType = StructType(Seq(
    StructField("val", IntegerType, nullable = false),
    StructField("data", StructType(Seq(
      StructField("squared", IntegerType, nullable = false),
      StructField("cubed", IntegerType, nullable = false)
    )))
    ))

Then the query could look like this:

val data =
      sqlContext.sql(
        s"""
          |SELECT val, data.cubed
          |FROM dataTable
          |WHERE val <= 40 AND data.squared >= 900
          |ORDER BY val
        """.stripMargin)

Then to make it all work, at the very least you'd need to construct the nested row correctly. Ignoring projection, you'd need to start with something like:

val row = Row(m("val"), Row(m("squared"), m("cubed")))

But obviously we'd have to adjust the way we process the query: how we handle projection and filtering. How will Spark SQL express what it needs us to return the "squared" sub-field of "data", but filter on the "cubed" sub-field? Actually, it doesn't. In the call to buildScan() it simply asks for the "data" column whenever any of the sub-fields appears in the SELECT, and just pulls the column apart for itself. Perhaps more disturbing is the fact that Spark SQL doesn't pass along filters for any of the sub-fields either, preferring to apply those filters itself, so our external data engine doesn't get the chance to take advantage of them at all.

All this just means that support for non-rectangular external data sources is present in Spark SQL as of Spark 1.2.0, and it works, but it doesn't really support sophisticated non-rectangular sources with any efficiency for the time being.

Related posts

For a more modern exploration of developing Spark external data sources (as of Spark 2.3.0) see Revisiting Spark External Data Sources (The V2 APIs).

Sunday, December 21, 2014

External Data Sources in Spark 1.2.0

One of the most exciting aspects of the recent Spark 1.2.0 release is the Spark SQL API for external data sources. This is an API for mounting external data sources as temporary tables, which can then be queried through SQL. In this post we'll look at how you can define your own extremely simple external data source and query it. (Edit: In a followup post I've delved into what it takes to push filtering and projection into your external data source.)

As with other posts in this series, I'm assuming familiarity with Scala, and the code for the examples can be found at https://github.com/spirom/LearningSpark -- in this case sql/CustomRelationProvider.scala.

The external data source API is located in the org.apache.spark.sql package, and consists of six abstract classes, six case classes and one trait. However, to get started with a simple integration project all you need to understand is the trait RelationProvider and the abstract class TableScan.

In order to define the classes needed you'll need the following imports:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.sources.{TableScan, RelationProvider}

To keep this example simple, I won't involve any actual use of an external system. Instead I'll show how to create a table of synthetic data. Most of the work involves extending the abstract class TableScan, to provide (a) the table schema and (b) the row data. In this example, the table contains just three columns. The first is a unique integer and the second and third columns are that integer squared and cubed, respectively.

To understand this implementation you need to understand two aspects of Spark SQL that, mercifully, have recently been documented: StructType (and other type constructors) and Row in the org.apache.spark.sql package. In the Spark SQL Programming Guide, see the entries on "Programmatically Specifying the Schema" and "Spark SQL DataType Reference."

case class MyTableScan(count: Int, partitions: Int)
                      (@transient val sqlContext: SQLContext) 
  extends TableScan 
{

  val schema: StructType = StructType(Seq(
    StructField("val", IntegerType, nullable = false),
    StructField("squared", IntegerType, nullable = false),
    StructField("cubed", IntegerType, nullable = false)
  ))

  private def makeRow(i: Int): Row = Row(i, i*i, i*i*i)

  def buildScan: RDD[Row] = {
    val values = (1 to count).map(i => makeRow(i))
    sqlContext.sparkContext.parallelize(values, partitions)
  }
  
}

Now you need to extend the RelatoinProvider trait to provide the DDL interface for your data source. A data source is configured via a set of options, each of which is a key/value pair. By implementing the createRelation method we specify how to use these configuration parameters and the SQLContext to initialize an individual table. In the example, the only parameters are the number of rows in the table and the number of partitions for the resulting RDD.

class CustomRP extends RelationProvider {

  def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {
    MyTableScan(parameters("rows").toInt,
      parameters("partitions").toInt)(sqlContext)
  }
  
}

Now you need to register a temporary table based on your external source, using a new Spark SQL DDL command CREATE TEMPORARY TABLE. You need to provide the table name, the class that defines it (the one implementing RelationProvider) and the options list. Be very careful with your DDL syntax here, and if you get into trouble remember to check Spark's DEBUG logs. If Spark SQL can't parse your DDL, it will hide the detailed and informative error message in a log entry, try re-parsing your DDL as DML, and show you only the latter error message.

.
sqlContext.sql(
      s"""
        |CREATE TEMPORARY TABLE dataTable
        |USING sql.CustomRP
        |OPTIONS (partitions '9', rows '50')
      """.stripMargin)

Finally, you can query your temporary table like any other, referring to the attributes you specified as the "schema" when extending TableScan.

val data = sqlContext.sql("SELECT * FROM dataTable ORDER BY val")

That's all you need for a very simple external data source -- all that's left is write the code to interface to the "external" system of your choice. Keep in mind that the approach I've shown here will always fetch all the columns and all the rows -- if your SQL query only requires, say, a few rows and a few columns, they'll be materialized and discarded by the rest of the query plan. Most of the rest of the external data source API is for dealing with this problem, but that's for a different post.