Thursday, May 14, 2015

JDBC Access to MongoDB via Apache Spark

In previous posts I've discussed a native Apache Spark connector for MongoDB (NSMC) and NSMC's integration with Spark SQL. The latter post described an example project that issued Spark SQL queries via Scala code. However, much of the value of Spark SQL integration comes from the possibility of it being used either by pre-existing tools or applications, or by end users who understand SQL but do not, in general, write code. In this post I'll describe how to set up a Spark Thrift server with NSMC, and query it via JDBC calls.

JDBC access to Spark SQL

To make JDBC connections to Spark SQL, you need to run the Spark Thrift server, for which I'll give instructions below. Writing a Java application that connects to the Thrift server requires the HiveServer2 JDBC connector. The link provides some general information together with a list of JARs needed in the CLASSPATH, but I prefer to use Maven dependencies. The example described here works with the following quite conservative dependencies:

<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-jdbc</artifactId>
  <version>0.13.1</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.2.0</version>
</dependency>

If you prefer to live on the edge, the following dependencies work too:

<dependency>
  <groupId>org.apache.hive</groupId>
  <artifactId>hive-jdbc</artifactId>
  <version>1.1.0</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>2.7.0</version>
</dependency>

Running a Spark Thrift server with NSMC

Let's begin by setting up a Spark Thrift server that has access to NSMC.

Prerequisites

In order to follow along with the setup and examples you will need to set up the following:

  1. A running instance of MongoDB.
  2. A running instance of Spark 1.3.1 or later that can communicate with the MongoDB instance.
  3. The NSMC Examples project, having run the main() method of the PopulateTestCollection class to set up the example collection in your MongoDB instance. (While the rest of the example classes need to be submitted as a Spark job, PopulateTestCollection connects directly to your MongoDB instance, and can be run stand-alone.)
  4. The NSMC JDBC Client Samples project, which I'll describe in some detail below.
  5. A configuration file that will be used to tell NSMC how to connect to your MongoDB instance -- we'll call it nsmc.conf.
The required contents of your nsmc.conf file are as follows:
spark.nsmc.connection.host      <your MongoDB host>
spark.nsmc.connection.port      <your MongoDB port>
# omit the next two lines if you're not using MongoDB authentication, 
# otherwise provide an appropriate username and password
spark.nsmc.user                 <your MongoDB user name>
spark.nsmc.password             <your MongoDB password>

Startup

Next you need to start a Spark Thrift server to act as a JDBC listener. You need to tell it:
  1. Where to find your Spark master.
  2. What packages it needs to download from the Maven Central Repository -- in this case a supported version of NSMC, v0.5.2 or later, compiled for Scala 2.10. (Earlier versions of NSMC cannot be used in this way.)
  3. Where to find NSMC configuration -- in this case your nsmc.conf file.
<your Spark installation>/sbin/start-thriftserver.sh \
    --master <your spark master URL> \
    --packages com.github.spirom:spark-mongodb-connector_2.10:0.5.2 \
    --properties-file <path to config files>/nsmc.conf

If you do not want Spark to download NSMC directly from the Maven Central Repository, perhaps because the servers do not have access to the Internet, you can download an assembly containing all the relevant packages -- look for spark-mongodb-connector-assembly-0.5.2.jar or a later version. If you take this approach you need to:

  • Save the assembly somewhere that is accessible on all servers in your Spark installation
  • Skip the --packages setting when starting the Thrift server and instead use --driver-class-path together with the path to where you saved the assembly.
  • Add an entry to your nsmc.conf file where the key is spark.executor.extraClassPath and the value is again the path to where you saved the assembly file.

Running the JDBC Examples

If you have cloned the NSMC JDBC Client Samples project, you can build and run it either through IntelliJ Idea, or through Apache Maven by using the commands:

mvn compile
mvn exec:java -Dexec.mainClass="Demo"

Most of the code is fairly mundane JDBC, and I won't attempt to write a JDBC tutorial. But I will step through some of the most interesting points. You may need to modify the connection string if you're running the Spark Thrift server on a remote host or a non-default port. Furthermore, if you've set up authentication you'll need to provide an appropriate user name and password.

Connection con = 
    DriverManager.getConnection("jdbc:hive2://localhost:10000/default", 
                                "hive", "");

The most important part of the example is registering a MongoDB collection with Spark SQL. There are a number of interesting aspects.

  • You need to register a temporary table, and thus you will need to re-register it if you restart the Spark Thrift server. Spark (as of 1.3.0) now supports persisting metadata for external resources like this in the Hive Metastore, but NSMC (as of v0.5.2) isn't yet capable of taking advantage of it.
  • Because all connections to a Spark Thrift server run in the same HiveContext, registering a MongoDB collection once like this makes it available to all users until the server is restarted.
  • It is the USING clause that tells Spark SQL to use NSMC for querying this resource. Since you told the Spark Thrift server to download an appropriate version of NSMC from the Maven Central Repository, NSMC will both set up the right metadata when a collection is registered, and then also process any queries to this table.
  • You can register several different MongoDB collections, potentially in different databases, through multiple invocations of CREATE TEMPORARY TABLE. Alas, all connections must use the same MongoDB endpoint (host/port) with the same NSMC configuration. See NSMC Issue #15 for the status of the needed enhancement.
CREATE TEMPORARY TABLE dataTable
USING nsmc.sql.MongoRelationProvider
OPTIONS (db 'test', collection 'scratch')

The remaining code executes some simple DDL and DML queries, and should be clear if you've looked at the MongoDB collection and queries in the NSMC Examples project. You will recognize the following Spark SQL schema as being the schema that NSMC infers from the MongoDB collection.

root
 |-- _id: string (nullable = true)
 |-- billingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- custid: string (nullable = true)
 |-- discountCode: integer (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemid: string (nullable = true)
 |    |    |-- orderid: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |-- shippingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)

Since this schema has both arrays and nested structures, I want to spend the rest of this post focusing on how JDBC handles returning query results from such an aggressively non-rectangular (or non-relational) schema. As you have seen in my original Spark SQL post, Spark SQL itself deals with this extraordinarily well, but alas the Hive JDBC connector does not.

The following simple query illustrates the issues, since billingAddress contains a nested document and orders contains an array.

SELECT custid, billingAddress, orders FROM dataTable

If you execute the query from Scala and print the schema of the result you will see:

root
 |-- custid: string (nullable = true)
 |-- billingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemid: string (nullable = true)
 |    |    |-- orderid: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)

And printing the data frame yields:

custid billingAddress orders               
1001   [NV,89150]     List([A001,100000... 
1002   [CA,92093]     List([B012,100000... 
1003   [AZ,85014]     List([A001,100000... 
1004   null           null                 

In JDBC we would expect the billingAddress to be modeled as java.sql.Struct and the orders to be a java.sql.Array (where the elements are also java.sql.Struct). Instead, we see that JDBC recognizes the original types, but transmits all the structured values as JSON strings. The following code can be used to show how JDBC perceives the column types:

ResultSetMetaData rsm = res.getMetaData();
System.out.println("*** Column metadata:");
int ncols = rsm.getColumnCount();
System.out.println("total of " + ncols + " columns");
for (int i = 1; i <= ncols; i++) {
    System.out.println("Column " + i + " : " + rsm.getColumnName(i));
    System.out.println(" Label : " + rsm.getColumnLabel(i));
    System.out.println(" Class Name : " + rsm.getColumnClassName(i));
    System.out.println(" Type : " + rsm.getColumnType(i));
    System.out.println(" Type Name : " + rsm.getColumnTypeName(i));
}

Here is the output:

*** Column metadata:
total of 3 columns
Column 1 : custid
  Label : custid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 2 : billingAddress
  Label : billingAddress
  Class Name : java.lang.String
  Type : 2002
  Type Name : struct
Column 3 : orders
  Label : orders
  Class Name : java.lang.String
  Type : 2003
  Type Name : array

The following code allows us to see the values. Since all of the columns have been converted to strings, it's worth noting that the highlighted code could just as well have been replaced with res.getString(i).

System.out.println("*** Data:");
while (res.next()) {
    System.out.println("Row " + res.getRow());
    for (int i = 1; i <= ncols; i++) {
        System.out.println(" Column " + i + " : " + res.getObject(i));
    }
}

Here is the output. Notice that it is quite readable, and the JSON strings could be parsed by your client code if needed, especially since you can use the ResultSetMetaData to determine what to expect in the String. Whether this is "good enough" really depends on your needs.

*** Data:
Row 1
  Column 1 : 1001
  Column 2 : {"state":"NV","zip":"89150"}
  Column 3 : [{"itemid":"A001","orderid":"1000001","quantity":175},
              {"itemid":"A002","orderid":"1000002","quantity":20}]
Row 2
  Column 1 : 1002
  Column 2 : {"state":"CA","zip":"92093"}
  Column 3 : [{"itemid":"B012","orderid":"1000002","quantity":200}]
Row 3
  Column 1 : 1003
  Column 2 : {"state":"AZ","zip":"85014"}
  Column 3 : [{"itemid":"A001","orderid":"1000003","quantity":175},
              {"itemid":"B001","orderid":"1000004","quantity":10},
              {"itemid":"A060","orderid":"1000005","quantity":12}]
Row 4
  Column 1 : 1004
  Column 2 : null
  Column 3 : null

Advantages of using HiveQL in queries

If Hive's compromise for returning semi-structured data doesn't meet your needs, you may want to consider doing more of the work in the query. For example, Spark SQL supports drilling into nested structures as follows:

SELECT custid, billingAddress.zip FROM dataTable

This actually works remarkably well. For example, if billingAddress happens to be null in some document, asking for billingAddress.zip also returns NULL, instead of generating an error. However, Spark SQL doesn't give you a good way to unpack arrays -- and yet there is a coping strategy for those too.

Requests to the Thrift JDBC server are executed in a HiveContext, so in addition to having access to Spark SQL as a query language, you can write queries in HiveQL. More than just being convenient for experienced Hive users, this allows you to work around some of the shortcomings of the Hive JDBC driver for dealing with the "non-rectangular" aspects of MongoDB collections. For example, the "LATERAL VIEW" feature of HiveQL allows you to effectively "denormalize" the data your query returns:

SELECT custid, o.orderid, o.itemid, o.quantity 
FROM dataTable LATERAL VIEW explode(orders) t AS o

Instead of returning one result row for each matching document, it returns one row for each order, with the custid joined on. The result is completely rectangular, and so is easy to deal with in your client code. First, you can see this in the "Type Name" of the metadata.

*** Column metadata:
total of 4 columns
Column 1 : custid
  Label : custid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 2 : orderid
  Label : orderid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 3 : itemid
  Label : itemid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 4 : quantity
  Label : quantity
  Class Name : java.lang.Integer
  Type : 4
  Type Name : int

Second, you can see it when you print the results.

*** Data:
Row 1
  Column 1 : 1001
  Column 2 : 1000001
  Column 3 : A001
  Column 4 : 175
Row 2
  Column 1 : 1001
  Column 2 : 1000002
  Column 3 : A002
  Column 4 : 20
Row 3
  Column 1 : 1002
  Column 2 : 1000002
  Column 3 : B012
  Column 4 : 200
Row 4
  Column 1 : 1003
  Column 2 : 1000003
  Column 3 : A001
  Column 4 : 175
Row 5
  Column 1 : 1003
  Column 2 : 1000004
  Column 3 : B001
  Column 4 : 10
Row 6
  Column 1 : 1003
  Column 2 : 1000005
  Column 3 : A060
  Column 4 : 12

You can get more information about this technique in the relevant section of the Hive Language Manual.

Summary

Using Apache Spark's Thrift server and NSMC's integration with Spark SQL, you can run highly distributed SQL queries against MongoDB collections, perhaps even performing joins between MongoDB collections and data stored elsewhere.

No comments:

Post a Comment