In previous posts I've discussed a native Apache Spark connector for MongoDB (NSMC) and NSMC's integration with Spark SQL. The latter post described an example project that issued Spark SQL queries via Scala code. However, much of the value of Spark SQL integration comes from the possibility of it being used either by pre-existing tools or applications, or by end users who understand SQL but do not, in general, write code. In this post I'll describe how to set up a Spark Thrift server with NSMC, and query it via JDBC calls.
JDBC access to Spark SQL
To make JDBC connections to Spark SQL, you need to run the Spark Thrift server, for which I'll give instructions below. Writing a Java application that connects to the Thrift server requires the HiveServer2 JDBC connector. The link provides some general information together with a list of JARs needed in the CLASSPATH, but I prefer to use Maven dependencies. The example described here works with the following quite conservative dependencies:
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>0.13.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.2.0</version> </dependency>
If you prefer to live on the edge, the following dependencies work too:
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>1.1.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.0</version> </dependency>
Running a Spark Thrift server with NSMC
Let's begin by setting up a Spark Thrift server that has access to NSMC.
Prerequisites
In order to follow along with the setup and examples you will need to set up the following:
- A running instance of MongoDB.
- A running instance of Spark 1.3.1 or later that can communicate with the MongoDB instance.
- The NSMC Examples project, having run the main() method of the PopulateTestCollection class to set up the example collection in your MongoDB instance. (While the rest of the example classes need to be submitted as a Spark job, PopulateTestCollection connects directly to your MongoDB instance, and can be run stand-alone.)
- The NSMC JDBC Client Samples project, which I'll describe in some detail below.
- A configuration file that will be used to tell NSMC how to connect to your MongoDB instance -- we'll call it nsmc.conf.
spark.nsmc.connection.host <your MongoDB host> spark.nsmc.connection.port <your MongoDB port> # omit the next two lines if you're not using MongoDB authentication, # otherwise provide an appropriate username and password spark.nsmc.user <your MongoDB user name> spark.nsmc.password <your MongoDB password>
Startup
Next you need to start a Spark Thrift server to act as a JDBC listener. You need to tell it:- Where to find your Spark master.
- What packages it needs to download from the Maven Central Repository -- in this case a supported version of NSMC, v0.5.2 or later, compiled for Scala 2.10. (Earlier versions of NSMC cannot be used in this way.)
- Where to find NSMC configuration -- in this case your nsmc.conf file.
<your Spark installation>/sbin/start-thriftserver.sh \ --master <your spark master URL> \ --packages com.github.spirom:spark-mongodb-connector_2.10:0.5.2 \ --properties-file <path to config files>/nsmc.conf
If you do not want Spark to download NSMC directly from the Maven Central Repository, perhaps because the servers do not have access to the Internet, you can download an assembly containing all the relevant packages -- look for spark-mongodb-connector-assembly-0.5.2.jar or a later version. If you take this approach you need to:
- Save the assembly somewhere that is accessible on all servers in your Spark installation
- Skip the --packages setting when starting the Thrift server and instead use --driver-class-path together with the path to where you saved the assembly.
- Add an entry to your nsmc.conf file where the key is spark.executor.extraClassPath and the value is again the path to where you saved the assembly file.
Running the JDBC Examples
If you have cloned the NSMC JDBC Client Samples project, you can build and run it either through IntelliJ Idea, or through Apache Maven by using the commands:
mvn compile mvn exec:java -Dexec.mainClass="Demo"
Most of the code is fairly mundane JDBC, and I won't attempt to write a JDBC tutorial. But I will step through some of the most interesting points. You may need to modify the connection string if you're running the Spark Thrift server on a remote host or a non-default port. Furthermore, if you've set up authentication you'll need to provide an appropriate user name and password.
Connection con = DriverManager.getConnection("jdbc:hive2://localhost:10000/default", "hive", "");
The most important part of the example is registering a MongoDB collection with Spark SQL. There are a number of interesting aspects.
- You need to register a temporary table, and thus you will need to re-register it if you restart the Spark Thrift server. Spark (as of 1.3.0) now supports persisting metadata for external resources like this in the Hive Metastore, but NSMC (as of v0.5.2) isn't yet capable of taking advantage of it.
- Because all connections to a Spark Thrift server run in the same HiveContext, registering a MongoDB collection once like this makes it available to all users until the server is restarted.
- It is the USING clause that tells Spark SQL to use NSMC for querying this resource. Since you told the Spark Thrift server to download an appropriate version of NSMC from the Maven Central Repository, NSMC will both set up the right metadata when a collection is registered, and then also process any queries to this table.
- You can register several different MongoDB collections, potentially in different databases, through multiple invocations of CREATE TEMPORARY TABLE. Alas, all connections must use the same MongoDB endpoint (host/port) with the same NSMC configuration. See NSMC Issue #15 for the status of the needed enhancement.
CREATE TEMPORARY TABLE dataTable USING nsmc.sql.MongoRelationProvider OPTIONS (db 'test', collection 'scratch')
The remaining code executes some simple DDL and DML queries, and should be clear if you've looked at the MongoDB collection and queries in the NSMC Examples project. You will recognize the following Spark SQL schema as being the schema that NSMC infers from the MongoDB collection.
root |-- _id: string (nullable = true) |-- billingAddress: struct (nullable = true) | |-- state: string (nullable = true) | |-- zip: string (nullable = true) |-- custid: string (nullable = true) |-- discountCode: integer (nullable = true) |-- orders: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- itemid: string (nullable = true) | | |-- orderid: string (nullable = true) | | |-- quantity: integer (nullable = true) |-- shippingAddress: struct (nullable = true) | |-- state: string (nullable = true) | |-- zip: string (nullable = true)
Since this schema has both arrays and nested structures, I want to spend the rest of this post focusing on how JDBC handles returning query results from such an aggressively non-rectangular (or non-relational) schema. As you have seen in my original Spark SQL post, Spark SQL itself deals with this extraordinarily well, but alas the Hive JDBC connector does not.
The following simple query illustrates the issues, since billingAddress contains a nested document and orders contains an array.
SELECT custid, billingAddress, orders FROM dataTable
If you execute the query from Scala and print the schema of the result you will see:
root |-- custid: string (nullable = true) |-- billingAddress: struct (nullable = true) | |-- state: string (nullable = true) | |-- zip: string (nullable = true) |-- orders: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- itemid: string (nullable = true) | | |-- orderid: string (nullable = true) | | |-- quantity: integer (nullable = true)
And printing the data frame yields:
custid billingAddress orders 1001 [NV,89150] List([A001,100000... 1002 [CA,92093] List([B012,100000... 1003 [AZ,85014] List([A001,100000... 1004 null null
In JDBC we would expect the billingAddress to be modeled as java.sql.Struct and the orders to be a java.sql.Array (where the elements are also java.sql.Struct). Instead, we see that JDBC recognizes the original types, but transmits all the structured values as JSON strings. The following code can be used to show how JDBC perceives the column types:
ResultSetMetaData rsm = res.getMetaData(); System.out.println("*** Column metadata:"); int ncols = rsm.getColumnCount(); System.out.println("total of " + ncols + " columns"); for (int i = 1; i <= ncols; i++) { System.out.println("Column " + i + " : " + rsm.getColumnName(i)); System.out.println(" Label : " + rsm.getColumnLabel(i)); System.out.println(" Class Name : " + rsm.getColumnClassName(i)); System.out.println(" Type : " + rsm.getColumnType(i)); System.out.println(" Type Name : " + rsm.getColumnTypeName(i)); }
Here is the output:
*** Column metadata: total of 3 columns Column 1 : custid Label : custid Class Name : java.lang.String Type : 12 Type Name : string Column 2 : billingAddress Label : billingAddress Class Name : java.lang.String Type : 2002 Type Name : struct Column 3 : orders Label : orders Class Name : java.lang.String Type : 2003 Type Name : array
The following code allows us to see the values. Since all of the columns have been converted to strings, it's worth noting that the highlighted code could just as well have been replaced with res.getString(i).
System.out.println("*** Data:");
while (res.next()) {
System.out.println("Row " + res.getRow());
for (int i = 1; i <= ncols; i++) {
System.out.println(" Column " + i + " : " + res.getObject(i));
}
}
Here is the output. Notice that it is quite readable, and the JSON strings could be parsed by your client code if needed, especially since you can use the ResultSetMetaData to determine what to expect in the String. Whether this is "good enough" really depends on your needs.
*** Data: Row 1 Column 1 : 1001 Column 2 : {"state":"NV","zip":"89150"} Column 3 : [{"itemid":"A001","orderid":"1000001","quantity":175}, {"itemid":"A002","orderid":"1000002","quantity":20}] Row 2 Column 1 : 1002 Column 2 : {"state":"CA","zip":"92093"} Column 3 : [{"itemid":"B012","orderid":"1000002","quantity":200}] Row 3 Column 1 : 1003 Column 2 : {"state":"AZ","zip":"85014"} Column 3 : [{"itemid":"A001","orderid":"1000003","quantity":175}, {"itemid":"B001","orderid":"1000004","quantity":10}, {"itemid":"A060","orderid":"1000005","quantity":12}] Row 4 Column 1 : 1004 Column 2 : null Column 3 : null
Advantages of using HiveQL in queries
If Hive's compromise for returning semi-structured data doesn't meet your needs, you may want to consider doing more of the work in the query. For example, Spark SQL supports drilling into nested structures as follows:
SELECT custid, billingAddress.zip FROM dataTable
This actually works remarkably well. For example, if billingAddress happens to be null in some document, asking for billingAddress.zip also returns NULL, instead of generating an error. However, Spark SQL doesn't give you a good way to unpack arrays -- and yet there is a coping strategy for those too.
Requests to the Thrift JDBC server are executed in a HiveContext, so in addition to having access to Spark SQL as a query language, you can write queries in HiveQL. More than just being convenient for experienced Hive users, this allows you to work around some of the shortcomings of the Hive JDBC driver for dealing with the "non-rectangular" aspects of MongoDB collections. For example, the "LATERAL VIEW" feature of HiveQL allows you to effectively "denormalize" the data your query returns:
SELECT custid, o.orderid, o.itemid, o.quantity
FROM dataTable LATERAL VIEW explode(orders) t AS o
Instead of returning one result row for each matching document, it returns one row for each order, with the custid joined on. The result is completely rectangular, and so is easy to deal with in your client code. First, you can see this in the "Type Name" of the metadata.
*** Column metadata: total of 4 columns Column 1 : custid Label : custid Class Name : java.lang.String Type : 12 Type Name : string Column 2 : orderid Label : orderid Class Name : java.lang.String Type : 12 Type Name : string Column 3 : itemid Label : itemid Class Name : java.lang.String Type : 12 Type Name : string Column 4 : quantity Label : quantity Class Name : java.lang.Integer Type : 4 Type Name : int
Second, you can see it when you print the results.
*** Data: Row 1 Column 1 : 1001 Column 2 : 1000001 Column 3 : A001 Column 4 : 175 Row 2 Column 1 : 1001 Column 2 : 1000002 Column 3 : A002 Column 4 : 20 Row 3 Column 1 : 1002 Column 2 : 1000002 Column 3 : B012 Column 4 : 200 Row 4 Column 1 : 1003 Column 2 : 1000003 Column 3 : A001 Column 4 : 175 Row 5 Column 1 : 1003 Column 2 : 1000004 Column 3 : B001 Column 4 : 10 Row 6 Column 1 : 1003 Column 2 : 1000005 Column 3 : A060 Column 4 : 12
You can get more information about this technique in the relevant section of the Hive Language Manual.
Summary
Using Apache Spark's Thrift server and NSMC's integration with Spark SQL, you can run highly distributed SQL queries against MongoDB collections, perhaps even performing joins between MongoDB collections and data stored elsewhere.
No comments:
Post a Comment