Both core Spark and Spark SQL provide ways to neatly plug in external database engines as a source of data. In this post I'm going to describe an experimental MongoDB connector for core Spark, called NSMC (for "Native Spark MongoDB Connector"). It's a "native" connector in the sense that it connects Spark directly to MongoDB, without involving anything else like Hadoop. For me, this was largely an exercise in learning how to integrate an external data engine with core Spark, and more generally, an exercise in learning about Spark, but over time it became about a number of other things, including MongoDB, distributed databases and semi-structured data. I hope to delve into the various areas of learning in future posts, but here I'll simply introduce the project and describe how you can experiment with it.
MongoDB makes an interesting case study as an external Spark data source for a number of reasons:
- It has become the most popular DBMS in a number of "non-traditional" categories, including non-relational and NoSQL, being one of the few remaining systems where the "NoSQL" label still means something.
- The data model, based on collections of JSON-like documents, is both deeply ad-hoc (i.e., collections have no a priori schema whatsoever) and deeply non-rectangular, making it an interesting and (as it turns out) challenging test case for integration with virtually any system.
- It is not directly served by Spark, although it can be used from Spark via an official Hadoop connector, and Spark SQL also provides indirect support via its support for reading and writing JSON text files.
NSMC is hosted on GitHub under an Apache 2.0 license, but I'm not going to discuss the implementation in this post at all.
Related posts
A number of followup posts take this work further:- In Spark SQL Integration for MongoDB I discuss how NSMC can now be used from Spark SQL.
- In Efficient Spark SQL Queries to MongoDB I describe how to make the Spark SQL integration more efficient, and
- in JDBC Access to MongoDB via Apache Spark I describe how to use NSMC's Spark SQL integration via JDBC.
Trying it out
To try the connector out in your system you need a Spark 1.1.0 instance and a MongoDB instance (clustered or not.) You can find a complete example to play with on GitHub. Your Spark code will need to be written in Scala, as part of an SBT project, and you need to include the following in your build.sbt file.
scalaVersion := "2.10.4" // any 2.10 is OK libraryDependencies += "org.apache.spark" %% "spark-core" % "1.1.0" libraryDependencies += "com.github.spirom" %% "spark-mongodb-connector" % "0.3.0"
Note that the connector itself depends on Spark 1.1.0 and Casbah. The connector API is in the nsmc namespace. You'll also need access to Casbah's import of DBObject class.
import nsmc._ import com.mongodb.casbah.Imports._
Then you need to configure a SparkContext in the usual way, and add some extra configuration to enable the connector to communicate with MongoDB. In this code snippet, and others below, the lines you need to add or change are highlighted.
val conf = new SparkConf()
.setAppName("My MongoApp").setMaster("local[4]") // or whatever
.set("nsmc.connection.host", "myMongoHost")
.set("nsmc.connection.port", "myMongoPort")
.set("nsmc.user", "yourUsernameHere")
.set("nsmc.password", "yourPasswordHere")
val sc = newSparkContext(conf)
Finally, you can call the mongoCollection() method on the context with the names of your favourite MongoDB database and collection.
val data = sc.mongoCollection[DBObject]("myDB", "myCollection")
Then the value of data will be an RDD[DBObject], and you deal with it the way you would deal with any RDD, and with the elements like any Casbah DBObject. This means that you're unlikely to get very far unless you already know how to read from MongoDB using Casbah -- in fact this may be a good time to learn the Casbah API by writing a simple Scala example that doesn't involve Spark at all.
As you might expect if you are familiar with Spark RDDs, the above code doesn't actually load any data from MongoDB -- it's only when you use the RDD in a computation that its partitions are populated lazily.
Partitioning
By default, the RDD created has only one partition, which can create a performance bottleneck in the case of a large collection. If the collection you need to load is indexed in MongoDB, NSMC can ask MongoDB to tell it how to create partitions in a reasonable way. The following configuration will allow you to use that feature. Notice that you have to enable it and set the maximum size (in megabytes) of a Spark partition. You also have to say which indexed fields you want to partition on when you create the RDD.
val conf = new SparkConf() .setAppName("My MongoApp").setMaster("local[4]") .set("nsmc.connection.host", "myMongoHost") .set("nsmc.connection.port", "myMongoPort") .set("nsmc.user", "yourUsernameHere") .set("nsmc.password", "yourPasswordHere") .set("nsmc.split.indexed.collections", "true") .set("nsmc.split.chunk.size", "4") val sc = new SparkContext(conf) val data = sc.mongoCollection[DBObject]("myDB", "myCollection", Seq("key"))
If you have sharded collections, you can simply turn each shard into a Spark partition. You can even tell NSMC to bypass mongos and connect directly to the shards -- although this setting is best avoided unless you understand MongoDB sharding really well. The following snippet enables both of these features.
val conf = new SparkConf()
.setAppName("My MongoApp").setMaster("local[4]")
.set("nsmc.connection.host", "myMongoHost")
.set("nsmc.connection.port", "myMongoPort")
.set("nsmc.user", "yourUsernameHere")
.set("nsmc.password", "yourPasswordHere")
.set("nsmc.partition.on.shard.chunks", "true")
.set("nsmc.direct.to.shards", "true")
val sc = new SparkContext(conf)
val data = sc.mongoCollection[DBObject]("myDB", "myCollection")
If you enable partitioning for both unsharded and sharded collections (and this may make sense if you will read from multiple collections), the shards take precedence for sharded collections. That is, a sharded collection will then always be partitioned according to its shards.
Things to note:
- All of the properties nsmc.split.indexed.collections, nsmc.partition.on.shard.chunks and nsmc.direct.to.shards default to false, which means that if you don't set them you'll get an unpartitioned RDD.
- These settings are global to a SparkContext, which may cause problems in some applications and perhaps provides an interesting design challenge for future versions of this connector.
- The setting of nsmc.direct.to.shards is only used if you set nsmc.partition.on.shard.chunks to true
Configuration overview
Configuration for the connector is picked up from your SparkContext. Here is an overview of all the relevant settings.
Setting | Meaning | Units | Default |
---|---|---|---|
nsmc.connection.host | MongoDB host or IP address | localhost | |
nsmc.connection.port | MongoDB port | 27017 | |
nsmc.user | MongoDB user name | no authentication | |
nsmc.password | MongoDB password | no authentication | |
nsmc.split.indexed.collections | Should indexed collections be partitioned using MongoDB's [internal] splitVector command? | boolean | false |
nsmc.split.chunk.size | Maximum chunk size, in megabytes, passed to MongoDB's splitVector command, if used. | MB | 4 |
nsmc.partition.on.shard.chunks | Should collections that are already sharded in MongoDB retain this as their partitioning in Spark? If not, the entire collection will be read as a single Spark partition. | boolean | false |
nsmc.direct.to.shards | If sharding of collections is being observed, should the mongos server be bypassed? (Don't do this unless you understand MongoDB really well, or you may obtain incorrect results -- if MongoDB is rebalancing the shards when your query executes.). | boolean | false |
Limitations of the connector
NSMC is strictly experimental, and not suitable for production use. Use it if you have a strong stomach and enjoy experimenting. You can get an overview of its current limitations at any time by checking the Issues page on GitHub but because it's so important to realize just how experimental the connector is currently, I'll list the most important limitations.
- While, in spirit, NSMC is similar to MongoDB's Hadoop connector, it is much less sophisticated and less tested.
- There are no Java or Python APIs: it's Scala only
- There's no integration with Spark SQL
- Writing data to MongoDB is not supported
- You can't get MongoDB to filter the collection before loading it into Spark -- the entire collection is loaded, no matter how large.
- There's no way to take advantage of MongoDB replication
- Advanced mongoDB authentication is not supported
- Neither Spark 1.2.0 nor Scala 2.11 are supported
- Some of the MongoDB commands used in the implementation, also used in the MongoDB connector for Hadoop, are not really public interfaces.
My limitations
I hope people will try the connector out and share their experiences, but I won't actually be able to give anything approaching professional support: this is a self-education project and I'm doing it in my spare time. If the project turns out to be broadly useful, I'm also happy to help turn it into a collaborative one, or to get it incorporated into a larger pre-existing project. I haven't discussed it with the developers of MongoDB, and don't know whether they're at all interested in building a native Spark connector.
It's also important to understand that I don't have a large MongoDB cluster and I'm not going to build one. I've taken some care to get decent logging in place right from the start, and will be happy to look at log files.
What's Next?
As there are a lot of interesting and potentially difficult issues to address before the connector can be considered complete, in future posts (and code commits) I'll address design issues and implementation choices.
No comments:
Post a Comment