Sunday, October 23, 2016

Developing Utility Bolts for Apache Storm

Apache Storm is a distributed stream processing framework: one of many such frameworks but among the most popular. Storm applications ("topologies") are composed of "spouts" (sources of data) and "bolts" (data transformations), and these are connected by "streams" of "tuples", which are a sequence of typed key/value pairs of data. The spouts and bolts can be thought of as vertices in a directed acyclic graph, and the streams as edges. The spouts are always graph sources, with only outgoing edges, but the bolts may have either both incoming and outgoing edges, or they can be sinks, with only incoming edges.

Storm provides various pre-defined components, most of them spouts, providing standard data sources for streaming data from database systems, file systems, queueing systems and network listeners such as a web server, and so on. Similarly it provides pre-defined bolts, some serving as data sinks along the same lines, as well as interfaces to the usual logging frameworks.

In this post I'm going to examine what it takes to do a good job of adding reusable transformers (in the form of utility bolts) to Storm, for use by topology developers. Storm already provides a number of these, mostly in the package org.apache.storm.starter.bolt, and a few more in the package org.apache.storm.testing. (Storm follows a convenient naming convention where all bolt class names end with "Bolt".) Alas, most of these are completely undocumented, at least in the JavaDoc, but many are quite simple, and their usage can be worked out from a quick read of the source. Standard transformations can provide simple operations like projecting out unwanted fields, or much more complex ones like filtering, aggregation or data smoothing.

Since sometimes spouts and bolts have common or interacting design issues I'll occasionally touch on the design of spouts, but that's a complex topic in itself that is mostly out of scope for this post.


Since this is intended to be a practical guide for writing reusable bolts, I'm going to assume that you already understand the basic mechanics of writing a very simple bolt and getting it working in a Storm topology. A good source for learning how to do this is the book "Storm Applied: Strategies for real-time event processing" by Sean T. Allen, Matthew Jankowski, and Peter Pathirana. I'm also assuming that you have the most basic familiarity with Storm's Java API.

Open Source Project

You may want to read this in conjunction with the storm-gadgets project on GitHub, which includes a small number of bolts largely developed using the design principles described here, although I'll leave detailed discussion of the actual code to another post.

Design Goals

First I'd like to propose some design goals for creating utility bolts:

Ease of use: The purpose and behavior of the bolt should be clear and it should be easy to set up and include in a topology.

Appropriate Generality: When designing reusable components of any kind there's a tradeoff between ending up with lots of similar components on one hand, and components with complex configuration on the other. When adding components to an existing framework it helps to "blend in" with how existing components have handled this compromise. Another facet of generality is adaptability to as wide a range of topologies as possible, in terms of variations like concurrency support, reliable delivery support, tuple contents, and so on.

Robustness: Good choices need to be made about what kinds of errors are tolerated and which lead to topology execution failure. Here again the pre-existing components can be a guide. Furthermore, in the streaming world it's very expensive to allow bad input data or a localized problem to terminate the application. It's usually best to avoid interrupting stream processing in all but the most severe cases: anything that prevents the successful processing of a large fraction of tuples.

Ease of Diagnosis: It's important to be able to diagnose misconfiguration of these components as well as failures, or other faults, during their execution. Again, existing components can be a guide here, but broadly we want to be able to read the usual logs and see what is happening in each component – easily being able to zoom in on a specific component type and/or component instance. The reader of log messages needs to be able to understand the scope and severity of each reported problem, and ideally what to do about it: fix bad configuration, restart the topology, solve an environmental problem, etc.

Performance and Scalability: In addition to the component itself performing well, it should not detract from the performance and scalability of the topologies that use it any more than necessary.

Implementation Guidelines

To meet the above component design goals in the Apache Storm framework, we need to address certain technical issues. I'll leave performance and scalability to a separate post, and address the functional issues here. As mentioned earlier, this discussion will occasionally refer to the Java API, although that's not the only option for implementing bolts.

Distinguishing between inputs: A component may take inputs from multiple other components, and will often treat those inputs differently -- that is, they have different roles in the operation of the component, and so the topology developer will need to be able to specify which input stream has which role. Furthermore, upstream components may be emitting tuples on multiple streams, and sometimes multiple output streams of a single component may be consumed by our component. In Storm, streams have names local to the component that emits them, and components within a topology live in a flat namespace where they have global names. Storm provides the class org.apache.storm.generated.GlobalStreamId for dealing with this two-level namespace. In short, the component must support dealing unambiguously with the names of streams.

Organizing outputs in a consumable way: Our own component may need to generate multiple output streams, in which case they need to be named. Even if there is only one, there may be reasons not to simply use the default output stream (whose name is, aptly enough, "default".) Sometimes it will make sense to generate the stream names ("out1", "out2", …) but in other cases they will need user configuration to fit them into a topology. The fields in output tuples will also need names, which may be fixed or generated in some cases, and need to be configured in others. This can be a lot for the user to configure, and the decision as to what needs to be configured should be made carefully. Finally, there are cases where it may be tempting to choose the output stream and field names based on the input stream and field names. There are two problems with this. First, while it may seem like a great way to avoid configuration altogether, Storm spouts and bolts are required to declare their streams and fields (and the order of fields) via the declareOuputFields() callback when the topology is initialized. Second, while it is often practical to use the configured names of inputs as names of outputs, you need to watch out for collisions – multiple input components may use the same stream name, and multiple streams may use the same field name. In short, simply passing input names through as output names is not a viable bolt design strategy in Storm.

Interoperating with Guaranteed Delivery: The degree to which a topology achieves guaranteed delivery of tuples depends on its configuration, as well as the behavior of the spouts and bolts. Spouts need to assign IDs to tuples, bolts need to anchor their emitted tuples appropriately with respect to input tuples, and all components need to acknowledge tuples appropriately. Spouts have to implement the ack() and fail() methods, which also impacts the nextTuple() method, as emitted tuples need to be stored, keyed by their tuple ID, until they are either acknowledged (and then deleted) or failed (and then replayed.) Finally, bolts that communicate with external systems such as databases or queueing systems will need to "fail" the tuple when operations on external systems fail, so that it will later be replayed. When developing a utility component, we don't know whether guaranteed delivery will be used in a particular topology -- it usually needs to support either behavior. Fortunately, if we develop the component as if guaranteed delivery will be used, it can also be deployed without it. As we will see below, doing this sometimes it raises complex design issues.

Concurrency: It is straightforward to write components in a way that allows Storm to operate multiple instances in parallel, but problems arise when we use these components in a topology and try to decide on what grouping method to use to connect them. Often a shuffle grouping will work – in particular, if the bolt processes each tuple completely in isolation from others. It gets more complicated if the order of tuples is significant to the bolt, or they need to be grouped in some way – then often a fields grouping is appropriate. This is all in a day's work for Storm topology developers, but it requires understanding the behavior of each spout and bolt. As utility component developers, it's up to us to understand our component's behavior well enough to document the grouping requirements it imposes, and sometimes this can be complex as it may be different for different inputs. Spouts have additional responsibilities with respect to concurrency, as the various spouts reading from an external data source need to divide the data among themselves. When reading from a queue, this is straightforward, but if reading from a DBMS they may have to work out how to explicitly partition a table.

Error handling: The issue of error handling in streaming applications is complex and covering it completely in this post seems impossible. As utility component developers, however, we need to understand and document how our component interacts with system failures in the topology around it, and also what it considers "invalid" configuration and "invalid" input data.

Misconfigurations should usually be reported when a component is initialized (from the constructor) or during the call to prepare(), as they should, if at all possible, be reported before the topology starts to execute and should, in most cases, prevent it from executing. One major kind of misconfiguration that components should always check for during initialization is whether an appropriate set of input streams and output streams have been configured -- there's usually no point starting to execute data if they haven't. This is also a good time to check for groupings that can't be supported, concurrency levels that can't be supported, as well as combinations of grouping and concurrency.

Invalid tuples are a different matter: unclean data is a regular fact of life, and data pipelines should recover and continue executing whenever possible after an invalid tuple is received. This can be either very simple or complex depending on the nature of your component. One thing to remember is that if you effectively drop a tuple for being invalid, you still need to acknowledge it so it doesn't get replayed when guaranteed delivery is being used – this can feel counterintuitive but is very important. There remains the issue of reporting the problem to support diagnosability. It's important to be able to monitor whether the number of tuples (absolute or as a proportion of data processed) each component has rejected is very small or very large. In the latter case, hopefully an administrator should be alerted to check whether there is a major, systematic configuration or data source problem. Sometimes the administrator will have the luxury of stopping the data pipeline, but often this is out of the question. Millions of tuples may be rejected before an upstream problem is solved, and you don't want your alerting mechanism to cause more problems than it solves. For example, logging every rejected tuple can seem like a good idea, and indeed be very useful, until the logs fill up a storage device or the logging slows the topology to a crawl. Logging needs to be used judiciously, and logging the occasional rejected tuple is probably still a good idea. Logging the number of rejected tuples from time to time can also be useful. For some components, particularly those that are "fussy" about their inputs, it may make sense to output something (perhaps a count, or an error message) on a dedicated output stream whenever a tuple is rejected. It may even be tempting to output the entire tuple, but this is not straightforward. Since the field signatures of a component's output streams need to be pre-declared, it's hard to emit an unexpected field. One approach is to serialize the entire rejected tuple into a single field, perhaps called "tuple", perhaps in a serialization format that is both machine and human readable.

Spouts that attempt to support guaranteed delivery also need to handle situations where either tuples are not being acknowledged for a long time (imposing an huge interim storage burden on the spout) or repeatedly being failed (adding a retransmission burden to that storage burden) in both cases suggesting that something is seriously wrong. Such situations can be handled by occasional reaping of old tuples and by imposing limits on the number of retries – both requiring additional information to be stored with the transmitted tuple, as well as judicious decision making by the designer

Logging: Storm now uses SLF4J for logging, and it's straightforward for individual components to use it as well. Any logging done on a per-tuple basis should be at the DEBUG level so it can be disabled in production. Major component lifecycle and configuration events should be logged as INFO as it's cheap to log them and they should always be available.

One aspect of logging to be aware of is that a component can only become aware of its ID in the topology when prepare() is called. If you want to use it for logging elsewhere (and you will) you need to save it at that time. Furthermore, not only is the ID not known in the constructor, but it is also not known in declareOutputFields(), which is called before prepare(). If it seems useful for the association between the component ID and its configuration (and perhaps output fields) to be clear in the logs, you may want to log it all inside prepare() even though it was already available in the constructor and it may have been tempting to log it there.

Interactions with external systems: Spouts often read data from external systems and bolts can read or write data from/to such systems, or both. To do this responsibly, they should not overuse the resources of those systems, including connections. This includes limiting the number of concurrent connections, disconnecting responsibly when cleanup() or deactivate() are called. As mentioned earlier, it needs to be clear what happens when multiple instances of a component read from the same database table – are they replicating the data or partitioning it? An additional complication to keep in mind is that when guaranteed delivery is in play, the input tuple to a component may be replayed -- it's necessary to think through what effect this will have on the external system.

In Practice

You can make up your own mind as to how well the bolts in the project meet the design goals and conform to the implementation guidelines: I'll discuss some of them in detail in future posts. If you have bolts of your own that raise interesting issues, or feedback on the ideas discussed here, please let me know.

Thursday, October 20, 2016

Learning to use Apache Spark and Kafka Together

I recently wrote about the unexpected popularity of the LearningSpark project on GitHub, and speculated that some of the popularity stemmed from the convenience of not having to set up a Spark server -- each example program is more-or-less self-contained. This approach has certain limitations (for example, it's an awful way to explore performance and scalability issues) but it does leave people free to concentrate on model and API issues in isolation. This can be useful not only for Spark beginners, but also, as Spark evolves, it's a good way to understand the new features and the problem solving approaches they support.

Sooner or later, a large fraction of Spark users end up grappling with how to use Spark in conjunction with Apache Kafka. Now, the overheads of setting up both a Spark cluster and a Kafka cluster before you can write the stream processing equivalent of "Hello World" can be quite high, especially if you're still learning BOTH systems. Being in this situation myself, I started to wonder how hard it would be to set up a project where both the Spark system and the Kafka broker (and ZooKeeper instance) were embedded in the example program. Such an approach would have the same limitations as the Spark project described above, and probably suffer from them even more, but conversely, the benefits of such simplification would be even greater. It turns out not to be very hard to achieve, as you can see at another GitHub project, spark-streaming-with-kafka, but it does have certain gotchas, which I'll discuss.

It seems to me that the Kafka project has suffered rather considerable API churn, not just in the details of the APIs but in the fundamental conceptual model as well. Currently in Spark 2.0.0, released in July of 2016, Spark support for the APIs is lagging somewhat, supporting Kafka, released in February of 2015. This seventeen month lag causes some minor difficulties, although the situation seems likely to improve in Spark 2.1.

The main impact of this lag in API support is in the area of the kafka-unit project, which provides convenient APIs for managing an embedded Kafka instance. This project is now at version 0.6, but to get a release that supports Kafka we have to go back to kafka-unit 0.2, which is missing some handy newer features like creating partitioned topics. I've had to work around this in one of the utility classes discussed below.

Utility Classes

So far, I have needed the following utility classes to keep the examples sane. Note that these links point to a version of the code specially tagged to provide permanent links: keep this in mind if you want to see more recent versions of the code.

  • EmbeddedKafkaServer: Encapsulate uses of the kafka-unit project to embed a Kafka instance, working around the fact that, as disucssed above, we have to use a rather old version of that project.
  • SimpleKafkaClient: Some trivial default configuration for the producers and consumers used in the examples.
  • SparkKafkaSink: An extension of the code provided in Marcin Kuthan's rather useful blog post about publishing to a Kafka topic from a Spark job.His code uses the default partitioner, essentially broadcasting the contents of each RDD partition to all of the topic partitions, effectively causing a random repartitioning. The additional overload of the send() method here instead allows the topic partition to be specified, so the contents of all RDD partitions are sent to the same topic partition. I'm not sure this is useful in practice, but it helps to illustrate the relationship between RDD partitioning and topic partitioning.


So far, the following five examples are available. Once again, the links point to a tagged version of the code.

  • SimpleStreaming: The most basic streaming example: starts a Kafka server, creates a topic, creates a stream to process that topic, and publishes some data using the SparkKafkaSink.

    For each of the received RDDs, the code prints the number of partitions and the number of elements in each partition. The code exercises no control over the partitioning of the received RDDs, and there turn out to be two partitions each time, compared with four in the originating RDD and four in the topic. By examining the partitioning here, we set the stage for exercising some control over it in later examples.

    Notice there's quite a lot of waiting. It takes some time for streaming to get going, and data published too early tends to be missed by the stream. (No doubt, this is partly because this example uses the simplest method to create the stream, and thus doesn't get an opportunity to set auto.offset.reset to "earliest".) Also, data that is published takes some time to propagate to the stream. This seems inevitable, and is almost guaranteed to be slower in a self-contained example like this.
  • ExceptionPropagation: This example demonstrates that exceptions encountered in stream processing are rethrown from the call to awaitTermination(). The custom exception SomeException is thrown when an RDD is received.
  • MultipleConsumerGroups: This differs in creating two streams based on two different consumer groups, so both streams get a copy of the same data. It's simply a matter of specifying the two names of the two different consumer groups in the two calls to createStream() for the same topic -- no special configuration is needed. The two calls create two instances of ReceiverInputDStream, and then foreachRDD is called on each of those. This is valuable if you want to create more than one processing pipeline on the same data
  • PartitionedStreaming: By calling createDirectStream() instead of createStream(), you can get the generated RDDs to have a number of partitions (in this case 6) dictated by the partitioning of the topic.
  • ControlledPartitioning: Here the topic has six partitions but instead of writing to it using the configured partitioner, we assign all records to the same partition explicitly. Although the generated RDDs still have the same number of partitions as the topic, only one partition has all the data in it. This demonstrates how to exercise control over partitioning all the way from the original RDD, through the topic to the resulting RDDs.

Feedback please!

Personally I've found this project useful in improving my understanding of Kafka itself, as well as the Kafka integration features of Spark Streaming. It's always hard to tell which of these projects are useful to others and why, so I look forward to hearing from you about your experiences with the code.

Sunday, October 9, 2016

Learning Spark with Java

In a recent post I discussed the history and motivation of my LearningSpark project on GitHub. While that project is mostly based on the Scala APIs to Apache Spark, I explained why I had begun to explore the Java APIs as well. I also predicted that I would soon introduce a separate project, based solely on Maven and Java, to continue the Java exploration: most Java programmers are much more comfortable with Maven than with sbt, and a separate project allows me to choose the Java version appropriately.

The new learning-spark-with-java project on GitHub is the result. It started with a copy of the examples on the original project, but since I've now adopted Java 8, I rewrote the examples to make use of the latter's lambda expressions, perhaps ironically making the code now look more like the original Scala code.

I'll proceed with this project using the guidelines I listed in the LearningSpark project when I branched out into Java. I will almost definitely not:

  1. Rush to catch up with the Scala examples,
  2. Keep the two sets of examples perfectly (or even well) matched,
  3. Branch out into Python and R as well (seriously, I have no interest in doing this.)

I'll probably still focus on the Scala examples more, as new features seem to mature a little faster in the Scala API. I am unlikely to add to the Java examples in the LearningSpark project, and if they get in the way or create confusion, I may eventually delete them. As always, feedback is welcome, and I'm especially curious to see whether the community finds this project as useful as some people obviously found the earlier one.

Sunday, August 28, 2016

Taking a Detour with Apache Spark

Almost two years ago, while preparing for a talk I was giving at the now defunct Seattle Eastside Scala Meetup, I started a public GitHub project collecting and organizing Apache Spark code examples in Scala. I had stumbled on a way to run the examples on all supported platforms without setting up or deploying to a cluster, so the overheads of experimenting with the Spark APIs and programming idioms were remarkably low. It seemed like this approach was not well known at the time, so I shared it via the GItHub project and by posting here. Other than avoiding the overheads of a Spark cluster, the main feature of the project has been a "baby steps" approach to the examples. I've tried to demonstrate each API feature with multiple, slightly varying examples and (with notable, unfortunate exceptions) comments, to build intuitions before leaving the readers to take their chances with the Scaladoc.

Two years and about sixty sample programs later, I'm still not sure of the project's role and future, except that it has been tremendously helpful to my learning about Spark and Scala. The Apache Spark project's documentation and examples have improved, as has test coverage -- the latter always being a good way to learn about a new feature, except when there isn't any. The Databricks blog has also made a difference. And yet, the project continues to be useful to me, and I occasionally hear from others who find it helpful, including one local company that uses it in their training program. I like the "baby steps" approach to learning an API, and apparently I'm not the only one.

But lately I've had to ask myself some hard questions about the project. As I hope to post separately about soon, the evolution of Spark SQL's object model (remember SchemaRDD?) has made the task of keeping the project organized rather challenging lately -- I don't like to move examples around so I don't break links from the blog, StackOverflow and elsewhere. Another problem that's been nagging at me lately is my choice of Scala for the examples. I enjoy using Scala, have enjoyed learning it, and the Apache Spark project continues to keep the Scala APIs as a first class citizen. Indeed, Spark is written in Scala, but as I'll discuss later, that's no guarantee of strong support for Scala APIs. I've never been interested in the Python or R APIs, even though I believe they're of tremendous importance to the industry: I'm not part of the target audience (broadly, the data scientist) and I don't enjoy programming in either language. That leaves Java.

Time to explore the Java APIs

Many of you have seen the various Typesafe/Lightbend surveys showing Scala to be more popular than Java for Spark development -- the latest one has it at 76% Scala, 58% Java 8 and 34% Java 7 or lower. Clearly, there is overlap, so it's not clear whether Java or Scala are more popular overall. I see several reasons to explore Spark from the Java perspective:

  • Java is clearly an important part of the Spark ecosystem, as the surveys show.
  • The Java APIs are not merely an afterthought in Spark: real effort seems to have been invested in making Java programming practical and a reasonable approach.
  • While even a quick examination of the Spark project's Java examples (which date back to Java 7) shows them to be verbose and awkward compared with the Scala examples, the introduction of functional programming features in Java 8 raises the possibility of Java catching up.
  • I see a certain hesitation about Scala in the "big data" ecosystem. Lightbend has taken the "we don't have to choose" approach, and seems to be pretty sincere about it -- and of course they should be if they believe their own survey results. Confluent's decision about Apache Kafka is a bit more interesting: Kafka is also written in Scala, but only supports a Java API, with others provided by the community. While Cake Solutions actively develops the scala-kafka-client project, the Scala APIs are definitely not a first class citizen.
  • I've been a Java programmer, on and off, for 18 years. Before Scala, it was my recreational language of choice, and I still like it. I'm curious about Java 8, which I've only used a little, for another recent project.

Together, these certainly don't motivate me to abandon Scala, but they do motivate me to understand the tradeoffs better than I do now. The bottom line is that I've started adding some Java examples to the project, and started marking my commit messages with "[Scala]" or "[Java]" as appropriate.

Important Questions

I'm definitely making this up as I go, so let me expose some of the decisions I'm trying to make.

Which Examples?

I started with Dataset and DataFrame, since I had recently worked on those in Scala. But I'd at least like to get a cross section of the different areas: core RDDs, SQL, streaming and perhaps GraphX. Then I'll probably focus more on the areas that bring out interesting differences, whichever they turn out to be. There's no point exploring Spark SQL as a query language comprehensively in both Java and Scala, so I won't do it in Java.

Which Version(s) of Spark?

This is easy: much of the reason I invest in the project is to keep up with Spark evolution, and it takes a lot of effort. I'll continue adopting each new Spark release as soon as I can, and use its new features.

Java 8 or Earlier?

Java 8 seems to be getting a lot of adoption, and the new features definitely make it better suited to Spark. But the APIs have a number of features that were intended to work around the deficiencies of earlier versions of Java (such as all of, so it seems interesting to explore them for a while. Yet I'll probably change to Java 8 soon to keep the project from becoming a museum.

One or Two Projects on GitHub?

So far I've used the parallel Scala/Java source tree structure of sbt projects to locate the Java code in the same project as the Scala code, but I'm already feeling like this was a bad idea. I think it hinders my ability to serve the community, since Java programmers are much more likely to be familiar with Maven than sbt, and the one Java class I had written to support the Scala code (hiveql.SumLargeSalesUDAF) is now tangled up with the Java examples. I think you can expect to see a separate project soon. (Splitting the projects also allows me to use different Java versions.)

Parallel Organization?

As I mentioned earlier, the evolution of the object model around Spark SQL has made it hard to keep the project organized, and the Scala examples are getting out of hand. I'm not going to inflict this entropy on Java developers, and will try to organize the Java examples according to my current understanding of how Spark fits together. In due course this may help me sort out the organization of the Scala examples too -- in any case I'm hoping to write a separate post on this topic.

How Much Effort?

I don't know how much I'll balance my effort on Scala and Java examples, or even whether I'll keep working on the Java ones for much longer. It depends on feedback, how much insight I get, where the community ends up going, and how Java and Scala (and Spark) evolve.

Abandoning Scala?

I've already made this decision: definitely not. It's now my recreational language of choice, and I think it has a future. At the very least, I plan to keep up my attempts at covering major Spark features in Scala as they evolve.

Feedback please!

While my "baby steps" approach to Spark examples seems to have had some impact on the community, I get very little direct feedback. Occasional questions have inspired some of the examples, which I hope were helpful to those people, and one local professional who reached out and told me how he has found the project valuable has dramatically increased my motivation. I'd be delighted to hear about your experiences, either about the examples themselves, or about Spark in general.

Sunday, January 31, 2016

Apache Spark in Practice: US Airline On-Time Performance

A few years ago I developed a fascination with a data set published by the Bureau of Transportation Statistics in the US Department of Transportation: "Airline On-Time Performance and Causes of Flight Delays: On_Time Data." This data usually attracts attention because of flight delays, but actually contains lots of broader information about US airports, airlines, routes, traffic patterns and even, to a point, aircraft utilization. As such, it's really a window into the whole topic of commercial passenger air transportation in the United States. Some basic characteristics of the data set are as follows:

Time span covered1987 to 2015
Number of scheduled flights162,212,419
Number of aircraft tail numbers14,858
Number of airlines31
Number of airports388
Number of airport (origin,destination) pairs
with at least one flight

My interest in Apache Spark is no surprise to readers of this blog, but recently the two topics collided when I was learning how to run a Spark cluster through Amazon's Elastic MapReduce service, and read a blog post by Jon Fritz on the Amazon Web Services official blog. The post shows how to run some simple Spark SQL queries against a copy of this data set hosted on Amazon's S3 storage service, conveniently converted to Parquet for easy and efficient access from Spark. I had been shopping for a somewhat real-world project through which to study ways to write efficient computations in core Spark using Scala, and so a project was born.

Is this a good data set for Spark?

Admittedly, the on-time performance data is not huge. But, with a modest cluster, fairly simple queries against the full data set take several minutes, and complex queries, or simple queries written badly, take a lot longer. While there's twenty nine years of data, you can also have quite a lot of fun with a contiguous subset, say just one or two years, and simple queries against that run quickly on an affordable, well configured PC.

At first, the data may seem quite simple. Partly that's an artifact of the denormalization that plagues so many public data sets. But also, the structure of this data is subtle, with significantly graph-like structure at multiple levels. The airports and regular routes between them form a pretty interesting graph, with valuable data on both the vertices and the edges. Multiple flights can be linked by flight number or aircraft tail number. Finally, there are lots of interesting correlations (or absence thereof) with external data to be explored. Weather seems like a good place to start, but the demographic and economic data for nearby cities could be interesting too. I haven't tried using GraphX on this data set yet, but I'm really looking forward to it.

Running the code

I started coding on this over two months ago. I'm sure my explorations will provide material for quite a few posts, but for now I'd just like to introduce the project, which is available on GitHub. See the README for information about how to run the examples. To add another experiment you need to extend the CoreExperiment class, just like the existing core Spark examples do, and tell the 'registry' about it in the Flights class. Then you can either run all the registered experiments, or just the comma separated list you provide on the command line. The README explains how the output is organized. You don't actually need to use Elastic MapReduce to run the examples: you can either download the entire data set (a bit large) or use my ParquetSubsetMain utility to create a subset. Then you can either submit it to the cluster of your choice or use the "--local" flag to run it as a stand alone Scala program. During my own development, I use the latter technique: I run FlightsMain as a stand alone program, using a two-year local Parquet extract of the data. I'm only testing the code on Linux. When I run against the full data set I start an EMR cluster, use sbt's "assembly" command to generate a self-contained JAR, upload it to S3, submit FlightsMain to the cluster, and collect my output from S3.

Why so much framework?

Somewhat to my surprise, the code has ended up rather "framework heavy." This happened in response to goals I initially didn't know I had, but discovered along the way:

  1. A uniform way to run both core Spark and Spark SQL experiments.
  2. An easy way to get results, including performance measurements and diagnostics, back out to S3 without lots of maintenance.
  3. A way to run all the registered experiments or just specific ones, in a specific order, possibly with repetition to help obtain consistent performance results.
  4. Easy switching between local execution, with a development environment and a debugger, on a subset of the data, and execution on a cluster against all the data.

Project goals

I think I'm really trying to study two things with this work: how to do real work with core Spark, and benefit from the efficiency advantages of doing so, without drowning in complex Scala code. Frankly, I'm not even sure how great the advantages of using core Spark are, or whether drowning in complex Scala code can be avoided, although I'll point out that the first question can be answered through measurement, while the second is rather subjective. I like Scala and core Spark, but they both take a lot of investment to learn to use well, and they're certainly not for everybody.

What's Next?

Over the next few posts I hope to illustrate some of the basic techniques of core Spark using simple examples for this project. I look forward to hearing from you about your impressions, and about your own experiences with using core Spark from Scala.

Saturday, May 16, 2015

Efficient Spark SQL Queries to MongoDB

In previous posts I've discussed a native Apache Spark connector for MongoDB (NSMC) and NSMC's integration with Spark SQL. The latter post described a naive implementation of Spark SQL integration, intended to demonstrate its value and the applicability of Spark SQL's recently introduced external data source API. In particular, I had then made no attempt to make the implementation efficient. In this post I will describe how to use some of the features of the external data source API to achieve a much more efficient Spark SQL integration.

General Approach

As discussed in earlier posts, the major challenges in implementing a Spark SQL external data source for MongoDB are:

  1. Efficient schema inference for the entire collection.
  2. Efficient use of MongoDB's query capabilities, based on Spark SQL's projection and filter pushdown mechanism, to obtain the data required for each Spark SQL query.

The NSMC project is hosted on GitHub, and the class nsmc.sql.MongoRelationProvider is a good starting point for reading the Spark SQL integration code.

Schema Inference

The Spark SQL schema for a MongoDB collection is computed when the collection is registered using, say, Spark SQL's CREATE TEMPORARY TABLE command. The goal of schema inference is to generate a Spark SQL schema (Seq[StructField]) to which each document in the collection conforms, so that each document can be converted to an org.apache.spark.sql.Row in order to produce an RDD[Row] as a query result. Some of the subtleties of what it means to infer such a schema were discussed in the original post on Spark SQL integration.

Schema inference itself makes use of Spark's parallel computation features. First, a set of partitions is computed for the collection. Then these are parallelized into an RDD. Next, each partition's collection data is read (in parallel) and a separate schema is computed for each partition. Finally, these schemas are all merged into a single schema that definitively represents the collection. This approach is summarized in Figure 1.

Figure 1: Schema inference

An important characteristic of this algorithm is that the collection is not stored -- the documents are analyzed "on the fly" to compute the schema. Obviously this can be a good thing, if the queries end up depending on small subsets of the collection, or a bad thing if the queries tend to re-scan the entire collection every time.

Collection Scan

A collection is scanned when Spark SQL calls the buildScan() method on the MongoTableScan class, which it does in order to process each query -- this interface was discussed in an earlier post on the external data source API. This method is passed the filters and projections that Spark SQL can benefit from being executed by MongoDB. This information is first used to generate an appropriate MongoDB query as discussed below. That query is executed to obtain an RDD[DBObject] (the representation of a document in the MongoDB APIs), which is then converted to an RDD[Row] using the schema information computed previously. This means that the conversion, like schema inference, also occurs in parallel, although the performance benefit of this is dependent on how skewed the query results are.

Figure 2: Table scan

Generating MongoDB queries from the parameters to the buildScan() method is quite straightforward. MongoDB's find() method happens to be conveniently factored for this, as it takes one parameter for filtering and one for projection. I've given a number of examples of how this conversion turns out below.

Example collection and queries

The best way to understand how NSMC's Spark SQL integration works is to look at some example Spark SQL queries together with the generated MongoDB queries.

The collection and its inferred schema

To understand how query evaluation works, we'll look at the following simple set of MongoDB documents:

  "_id" : ObjectId("554cc53280cecbc6a8579952"), 
   "item" : 1, 
   "quantity" : 5, 
   "price" : 12.5, 
   "category" : "A" 
  "_id" : ObjectId("554cc53280cecbc6a8579953"), 
  "item" : 2, 
  "quantity" : 10, 
  "price" : 12.5, 
  "location" : 
       "state" : "AZ", 
       "zip" : "85001" 
  "category" : "A" 
  "_id" : ObjectId("554cc53280cecbc6a8579954"), 
  "item" : 3, 
  "quantity" : 15, 
  "price" : 12.5, 
  "category" : "B" 
  "_id" : ObjectId("554cc53280cecbc6a8579955"), 
  "item" : 4, 
  "quantity" : 20, 
  "price" : 12.5, 
  "location" : 
       "state" : "NV", 
       "zip" : "89101" 
  "category" : "B" 

NSMC infers the following Spark SQL schema for these documents:

 |-- _id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item: integer (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: integer (nullable = true)

Filter and Projection Pushdown

As I discussed in an earlier post, Spark SQL's external data source API allows simple cases of filtering and projection to be pushed down to the back end database query, to make use of any indexing capabilities and to minimize the amount of data that has to be transfered back to Spark.

Let's start with the simplest of queries to see how this works, assuming we have registered the above collection with Spark SQL as the table mongoTable.

SELECT * FROM mongoTable

NSMC simply requests all the columns in the inferred schema from MongoDB. Since the always-present _id column is not explicitly suppressed, it will be present in the output. (MongoDB's convention for this field is that it is always returned unless explicitly suppressed.) Including it in this case was a design choice that seems to result in the most intuitive behavior for * queries. The important design choice was actually to include it as a column in the schema. Once that was done, Spark SQL would expect it to be returned in every * query.

Here is the generated MongoDB query, with no filtering and [effectively] no projection.

  { }, 
  { "category" : 1 , "item" : 1 , "location" : 1 , 
    "price" : 1 , "quantity" : 1 }

This query happens to be equivalent to both of the following, but NSMC doesn't recognize the fact:

collection.find( { }, { } )

collection.find( )

A query that explicitly projects columns results in only those being requested from MongoDB.

SELECT item, quantity FROM mongoTable

In this case, Spark SQL would not know how to deal with the _id column even if it was returned, so NSMC explicitly suppresses it to save bandwidth. It's probably not a very important optimization.

  { }, 
  { "item" : 1 , "quantity" : 1 , "_id" : 0 }

Now let's move on to simple filtering.

SELECT item, quantity 
FROM mongoTable 
WHERE quantity > 12

Indeed, the filter is pushed through to the MongoDB query.

  { "quantity" : { "$gt" : 12}},
  { "item" : 1 , "quantity" : 1 , "_id" : 0}

A disjunctive query is not currently handled by NSMC, although the capability to push it through was introduced in Spark 1.3.0.

SELECT item, quantity 
FROM mongoTable 
WHERE quantity > 12 OR quantity < 8

So only projection is pushed through.

  { "item" : 1 , "quantity" : 1 , "_id" : 0}

Accessing a nested document or array element doesn't disable projection pushdown.

SELECT item, quantity, 
FROM mongoTable

The projection is "widened" to the top level column, so somewhat too much data is pulled in from MongoDB.

  { "item" : 1 , "quantity" : 1 , "location" : 1 , "_id" : 0}

Filtering on a nested document or array element is more problematic.

SELECT item, quantity, 
FROM mongoTable 
WHERE location.state = 'AZ' AND quantity = 10

There's no widening trick to play here, so the problematic conjunct is not pushed through.

  { "quantity" : 10 }
  { "item" : 1 , "quantity" : 1 , "location" : 1 , "_id" : 0}

We should also check an aggregation query.

SELECT category, sum(quantity) FROM mongoTable GROUP BY category

The external data source API doesn't have a way to push grouping and aggregation through, so MongoDB is just asked to fetch the underlying data.

  { },
  { "category" : 1 , "quantity" : 1 , "_id" : 0 }

Scope for Improvement

A number of changes either in NSMC's implementation or in Spark SQL's implementation could result in still more efficient processing of queries to MongoDB.


Scanning the entire collection to compute a schema has the advantage of not missing documents that would have added valuable schema information. On the other hand, it can be very expensive. Sampling clearly has to be optional, with a parameter allowing the user to specify how much of the collection to sample.

Narrower Schema Inference

Currently, a schema is always inferred for the entire collection, which can be expensive if queries will only use part of the collection. An obvious refinement would be to allow users to specify a projection when registering the collection. Being able to specify a filter would be even more powerful, but less obviously useful.


The inferred schema is cached for use by all queries. This has its dangers, which are mitigated by Spark SQL's ability to refresh a registered collection.

Since the results of a query conform to all the standard characteristics of an RDD, they benefit from Spark's caching capabilities. However, it is not yet clear to me how much they benefit across multiple queries. This needs some investigation and experimentation.

Tighter Integration with Catalyst

Spark SQL's external data source API was designed with simplicity and ease of use in mind, and in my opinion it succeeds admirably in this regard. However, the API does provide for deeper (and harder to use) integration with Spark SQL's Catalyst query compiler. It may in the future be possible to improve NSMC's Spark SQL integration by using this feature.

Limitations of the External Data Source API

Spark SQL's external data source API basically can't handle nested structures or arrays. This means certain queries against a MongoDB collection will always require that too much data be returned from MongoDB to Spark. I don't know if this is at all on the radar of the Spark SQL developers.

Additionally, the external data source API does not support pushing joins or grouping and aggregation to the external database.

Integrating with Spark SQL's cost model

The external data source API allows an implementation to provide information about the size of a collection, but NSMC doesn't use this feature. It may allow the Catalyst query optimizer to make better query processing decisions.

Limitations of Catalyst

The Catalyst query optimizer currently has a limited cost model, and improvements in it may be helpful to NSMC's performance.

Improved Partitioning

NSMC's collection partitioning features are currently quite naive, and this may limit the efficiency gains of parallel schema inference and parallel query execution. This needs some experimentation.


In my original post about NSMC's Spark SQL integration, I described a very naive implementation, intended to show that such integration was possible and could be useful. In this post I've described more recent work to take advantage of Spark's parallelism for schema inference, to prevent schema inference from consuming large amounts of memory, and to take advantage of Spark SQL's filter and projection pushdown features to dramatically improve query performance in many cases. While there's a lot more to be done, these improvements in many cases give NSMC's Spark SQL integration quite realistic performance.

Thursday, May 14, 2015

JDBC Access to MongoDB via Apache Spark

In previous posts I've discussed a native Apache Spark connector for MongoDB (NSMC) and NSMC's integration with Spark SQL. The latter post described an example project that issued Spark SQL queries via Scala code. However, much of the value of Spark SQL integration comes from the possibility of it being used either by pre-existing tools or applications, or by end users who understand SQL but do not, in general, write code. In this post I'll describe how to set up a Spark Thrift server with NSMC, and query it via JDBC calls.

JDBC access to Spark SQL

To make JDBC connections to Spark SQL, you need to run the Spark Thrift server, for which I'll give instructions below. Writing a Java application that connects to the Thrift server requires the HiveServer2 JDBC connector. The link provides some general information together with a list of JARs needed in the CLASSPATH, but I prefer to use Maven dependencies. The example described here works with the following quite conservative dependencies:


If you prefer to live on the edge, the following dependencies work too:


Running a Spark Thrift server with NSMC

Let's begin by setting up a Spark Thrift server that has access to NSMC.


In order to follow along with the setup and examples you will need to set up the following:

  1. A running instance of MongoDB.
  2. A running instance of Spark 1.3.1 or later that can communicate with the MongoDB instance.
  3. The NSMC Examples project, having run the main() method of the PopulateTestCollection class to set up the example collection in your MongoDB instance. (While the rest of the example classes need to be submitted as a Spark job, PopulateTestCollection connects directly to your MongoDB instance, and can be run stand-alone.)
  4. The NSMC JDBC Client Samples project, which I'll describe in some detail below.
  5. A configuration file that will be used to tell NSMC how to connect to your MongoDB instance -- we'll call it nsmc.conf.
The required contents of your nsmc.conf file are as follows:      <your MongoDB host>
spark.nsmc.connection.port      <your MongoDB port>
# omit the next two lines if you're not using MongoDB authentication, 
# otherwise provide an appropriate username and password
spark.nsmc.user                 <your MongoDB user name>
spark.nsmc.password             <your MongoDB password>


Next you need to start a Spark Thrift server to act as a JDBC listener. You need to tell it:
  1. Where to find your Spark master.
  2. What packages it needs to download from the Maven Central Repository -- in this case a supported version of NSMC, v0.5.2 or later, compiled for Scala 2.10. (Earlier versions of NSMC cannot be used in this way.)
  3. Where to find NSMC configuration -- in this case your nsmc.conf file.
<your Spark installation>/sbin/ \
    --master <your spark master URL> \
    --packages com.github.spirom:spark-mongodb-connector_2.10:0.5.2 \
    --properties-file <path to config files>/nsmc.conf

If you do not want Spark to download NSMC directly from the Maven Central Repository, perhaps because the servers do not have access to the Internet, you can download an assembly containing all the relevant packages -- look for spark-mongodb-connector-assembly-0.5.2.jar or a later version. If you take this approach you need to:

  • Save the assembly somewhere that is accessible on all servers in your Spark installation
  • Skip the --packages setting when starting the Thrift server and instead use --driver-class-path together with the path to where you saved the assembly.
  • Add an entry to your nsmc.conf file where the key is spark.executor.extraClassPath and the value is again the path to where you saved the assembly file.

Running the JDBC Examples

If you have cloned the NSMC JDBC Client Samples project, you can build and run it either through IntelliJ Idea, or through Apache Maven by using the commands:

mvn compile
mvn exec:java -Dexec.mainClass="Demo"

Most of the code is fairly mundane JDBC, and I won't attempt to write a JDBC tutorial. But I will step through some of the most interesting points. You may need to modify the connection string if you're running the Spark Thrift server on a remote host or a non-default port. Furthermore, if you've set up authentication you'll need to provide an appropriate user name and password.

Connection con = 
                                "hive", "");

The most important part of the example is registering a MongoDB collection with Spark SQL. There are a number of interesting aspects.

  • You need to register a temporary table, and thus you will need to re-register it if you restart the Spark Thrift server. Spark (as of 1.3.0) now supports persisting metadata for external resources like this in the Hive Metastore, but NSMC (as of v0.5.2) isn't yet capable of taking advantage of it.
  • Because all connections to a Spark Thrift server run in the same HiveContext, registering a MongoDB collection once like this makes it available to all users until the server is restarted.
  • It is the USING clause that tells Spark SQL to use NSMC for querying this resource. Since you told the Spark Thrift server to download an appropriate version of NSMC from the Maven Central Repository, NSMC will both set up the right metadata when a collection is registered, and then also process any queries to this table.
  • You can register several different MongoDB collections, potentially in different databases, through multiple invocations of CREATE TEMPORARY TABLE. Alas, all connections must use the same MongoDB endpoint (host/port) with the same NSMC configuration. See NSMC Issue #15 for the status of the needed enhancement.
USING nsmc.sql.MongoRelationProvider
OPTIONS (db 'test', collection 'scratch')

The remaining code executes some simple DDL and DML queries, and should be clear if you've looked at the MongoDB collection and queries in the NSMC Examples project. You will recognize the following Spark SQL schema as being the schema that NSMC infers from the MongoDB collection.

 |-- _id: string (nullable = true)
 |-- billingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- custid: string (nullable = true)
 |-- discountCode: integer (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemid: string (nullable = true)
 |    |    |-- orderid: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |-- shippingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)

Since this schema has both arrays and nested structures, I want to spend the rest of this post focusing on how JDBC handles returning query results from such an aggressively non-rectangular (or non-relational) schema. As you have seen in my original Spark SQL post, Spark SQL itself deals with this extraordinarily well, but alas the Hive JDBC connector does not.

The following simple query illustrates the issues, since billingAddress contains a nested document and orders contains an array.

SELECT custid, billingAddress, orders FROM dataTable

If you execute the query from Scala and print the schema of the result you will see:

 |-- custid: string (nullable = true)
 |-- billingAddress: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- orders: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemid: string (nullable = true)
 |    |    |-- orderid: string (nullable = true)
 |    |    |-- quantity: integer (nullable = true)

And printing the data frame yields:

custid billingAddress orders               
1001   [NV,89150]     List([A001,100000... 
1002   [CA,92093]     List([B012,100000... 
1003   [AZ,85014]     List([A001,100000... 
1004   null           null                 

In JDBC we would expect the billingAddress to be modeled as java.sql.Struct and the orders to be a java.sql.Array (where the elements are also java.sql.Struct). Instead, we see that JDBC recognizes the original types, but transmits all the structured values as JSON strings. The following code can be used to show how JDBC perceives the column types:

ResultSetMetaData rsm = res.getMetaData();
System.out.println("*** Column metadata:");
int ncols = rsm.getColumnCount();
System.out.println("total of " + ncols + " columns");
for (int i = 1; i <= ncols; i++) {
    System.out.println("Column " + i + " : " + rsm.getColumnName(i));
    System.out.println(" Label : " + rsm.getColumnLabel(i));
    System.out.println(" Class Name : " + rsm.getColumnClassName(i));
    System.out.println(" Type : " + rsm.getColumnType(i));
    System.out.println(" Type Name : " + rsm.getColumnTypeName(i));

Here is the output:

*** Column metadata:
total of 3 columns
Column 1 : custid
  Label : custid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 2 : billingAddress
  Label : billingAddress
  Class Name : java.lang.String
  Type : 2002
  Type Name : struct
Column 3 : orders
  Label : orders
  Class Name : java.lang.String
  Type : 2003
  Type Name : array

The following code allows us to see the values. Since all of the columns have been converted to strings, it's worth noting that the highlighted code could just as well have been replaced with res.getString(i).

System.out.println("*** Data:");
while ( {
    System.out.println("Row " + res.getRow());
    for (int i = 1; i <= ncols; i++) {
        System.out.println(" Column " + i + " : " + res.getObject(i));

Here is the output. Notice that it is quite readable, and the JSON strings could be parsed by your client code if needed, especially since you can use the ResultSetMetaData to determine what to expect in the String. Whether this is "good enough" really depends on your needs.

*** Data:
Row 1
  Column 1 : 1001
  Column 2 : {"state":"NV","zip":"89150"}
  Column 3 : [{"itemid":"A001","orderid":"1000001","quantity":175},
Row 2
  Column 1 : 1002
  Column 2 : {"state":"CA","zip":"92093"}
  Column 3 : [{"itemid":"B012","orderid":"1000002","quantity":200}]
Row 3
  Column 1 : 1003
  Column 2 : {"state":"AZ","zip":"85014"}
  Column 3 : [{"itemid":"A001","orderid":"1000003","quantity":175},
Row 4
  Column 1 : 1004
  Column 2 : null
  Column 3 : null

Advantages of using HiveQL in queries

If Hive's compromise for returning semi-structured data doesn't meet your needs, you may want to consider doing more of the work in the query. For example, Spark SQL supports drilling into nested structures as follows:

SELECT custid, FROM dataTable

This actually works remarkably well. For example, if billingAddress happens to be null in some document, asking for also returns NULL, instead of generating an error. However, Spark SQL doesn't give you a good way to unpack arrays -- and yet there is a coping strategy for those too.

Requests to the Thrift JDBC server are executed in a HiveContext, so in addition to having access to Spark SQL as a query language, you can write queries in HiveQL. More than just being convenient for experienced Hive users, this allows you to work around some of the shortcomings of the Hive JDBC driver for dealing with the "non-rectangular" aspects of MongoDB collections. For example, the "LATERAL VIEW" feature of HiveQL allows you to effectively "denormalize" the data your query returns:

SELECT custid, o.orderid, o.itemid, o.quantity 
FROM dataTable LATERAL VIEW explode(orders) t AS o

Instead of returning one result row for each matching document, it returns one row for each order, with the custid joined on. The result is completely rectangular, and so is easy to deal with in your client code. First, you can see this in the "Type Name" of the metadata.

*** Column metadata:
total of 4 columns
Column 1 : custid
  Label : custid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 2 : orderid
  Label : orderid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 3 : itemid
  Label : itemid
  Class Name : java.lang.String
  Type : 12
  Type Name : string
Column 4 : quantity
  Label : quantity
  Class Name : java.lang.Integer
  Type : 4
  Type Name : int

Second, you can see it when you print the results.

*** Data:
Row 1
  Column 1 : 1001
  Column 2 : 1000001
  Column 3 : A001
  Column 4 : 175
Row 2
  Column 1 : 1001
  Column 2 : 1000002
  Column 3 : A002
  Column 4 : 20
Row 3
  Column 1 : 1002
  Column 2 : 1000002
  Column 3 : B012
  Column 4 : 200
Row 4
  Column 1 : 1003
  Column 2 : 1000003
  Column 3 : A001
  Column 4 : 175
Row 5
  Column 1 : 1003
  Column 2 : 1000004
  Column 3 : B001
  Column 4 : 10
Row 6
  Column 1 : 1003
  Column 2 : 1000005
  Column 3 : A060
  Column 4 : 12

You can get more information about this technique in the relevant section of the Hive Language Manual.


Using Apache Spark's Thrift server and NSMC's integration with Spark SQL, you can run highly distributed SQL queries against MongoDB collections, perhaps even performing joins between MongoDB collections and data stored elsewhere.