Sunday, April 22, 2018

Revisiting Spark External Data Sources (The V2 APIs)

Over three years ago I started writing about the Spark External Data Source API, first with two general posts: External Data Sources in Spark 1.2.0 and Filtering and Projection in Spark SQL External Data Sources.

Subsequent posts dealt with my explorations via integrating MongoDB:

Back then, I made my experiments available as part of a collection of self-contained Spark examples on GitHub, and a number of projects specifically targeted towards MongoDB:

I learned a lot by doing that, and apparently a number of people found the examples useful, but I never managed a systematic exploration of the original API and all of its capabilities -- something that didn't feel right, and that I always meant to go back to when time allowed.

Since then, a number of features have been added to the original API, but more significantly, the community started work on a new (V2) API described in this design document and in SPARK-15689. The new API has now been released with "experimental" status in Spark 2.3.0, and further improvements are pending, as described in SPARK-22386. Even allowing for the inevitable churn that an experimental API of this complexity will inherently experience, this seems like the perfect time to revisit Spark external data sources, and the new API seems like a good way to do so


I've started exploring the new API in a dedicated GitHub project. I knew I wanted to make the examples "self-contained", since people seemed to find that useful in in my earlier projects. But doing so presented a challenge, as the data sources are supposed to be "external." This meant that I had to make a hard choice between three options:

  1. Use a "trivial" data source like the file system. This can actually lead to considerable insight, but it's not really amenable to exploring all aspects of the API.
  2. Embed one or more external systems (presumably enough to cover all the concepts, which was unlikely to be possible with a single system.)
  3. Provide my own [contrived] data source(s) that allowed covering all the concepts.

I chose the third option, developing a rather simple key-value store called ExampleDB, in which I ignored the finer points like performance and persistence, and simply focused on developing a vaguely plausible API that could support the various examples. I'm expanding and refining ExampleDB as I add each example data source, and hoping I can keep it from becoming a monster. I decided to embed ExampleDB in each of the examples rather than starting a separate process like I did with the Kafka broker and Zookeeper in my Spark Streaming with Kafka project. However, to ensure that the data sources aren't "cheating" in the way they use ExampleDB, I only allow the data sources AND the example Spark jobs to call it via a remote API based on gRPC. Thus, the state of the RPC client needs to be managed carefully in the example data sources, making them realistic.

Another major decision point was which language to use for both the data sources and the Spark jobs. (ExampleDB is written in Java, but that's really of no consequence as far as the tutorial value of the project goes.) Java friendliness seems to have been a major design goal of the new API, and indeed the interfaces that define the API are all Java interfaces. What language will developers actually use to develop data sources? I don't really know, but I think it depends somewhat on their role. I suspect that Spark developers who need to roll their own data source will often do so in Scala, in proportion to the general Scala/Java split in the Spark user community. However, I also suspect that database and file system vendors will prefer Java, since Java talent is so much more plentiful in the industry overall. That's a problem for me because both classes of developers are part of my target audience. So far, all the data sources are in Java, but I've provided all the example Spark jobs in both languages since that hasn't been very costly.

All of these decisions are subject to being revisited as I see how much interest the project generates, and what seems to interest the community the most.

Examples in the Spark Source Code

While the V2 API is currently not documented much (except for JavaDoc comments in the interface sources), the Spark test suite does provide good coverage of the features. My examples are informed and inspired by many of the tests, but I've taken somewhat of a hard line on one design point that wouldn't have been important to the test developers. In the tests, you'll often see a single class implement multiple interfaces just for the sake of expediency. Sometimes they mix data that needs to be serialized for communication between the driver and executors with data that doesn't. That makes it hard to understand their communication behavior.

As an example, consider the test data source JavaSimpleDataSourceV2 on branch-2.3. The inner class JavaSimpleDataReaderFactory implements both DataReaderFactory<Row>, which needs to be serializable and DataReader<Row>, which doesn't.

For this reason I've been rather careful to distinguish between objects that stay put and objects that get communicated. (Keep in mind that this is not a criticism of the tests as tests, it's just a warning for anyone trying to use them as a tutorial, and an explanation of why my data sources may sometimes appear pedantic and bloated in comparison.)

Examples available now

The following example data sources are now available. (Note that the links below refer to a tagged version of the repository.)


    An extremely simple data source that supports sequential reads (i.e. on just one executor) from ExampleDB. It only supports reads from a single, pre-defined table with a pre-defined schema. This data source is probably about as simple as one that reads from a remote database can get. As such, it is aimed at the developer who needs to achieve a simple integration quickly and easily.


    Another simple data source that supports sequential reads (i.e. on just one executor) from ExampleDB. It gets a table name from its configuration and infers a schema from that table.


    This introduces parallel reads (i.e. on multiple executors) from ExampleDB. The resulting Dataframe is partitioned appropriately. If the number of partitions is specified in properties, it is used. Otherwise, the table's default partition count (always 4 in ExampleDB) is used.


    This also supports parallel reads (i.e. on multiple executors) from the ExampleDB. The interesting feature of this example is that it supports informing the Spark SQL optimizer whether the table is partitioned in the right way to avoid shuffles in certain queries. One example is grouping queries, where shuffles can be avoided if the table is clustered in such a way that each group (cluster) is fully contained in a single partition. Since ExampleDB only supports clustered indexes on single columns, in practice a shuffle can be avoided if the table is clustered on one of the grouping columns in the query. (In ExampleDB clustered tables, splits always respect clustering.)

Read/write data sources


    This data source adds the ability to write data, and does so in parallel. The various classes for reading are identical to those of ParallelRowDataSource. All four values of SaveMode are supported. Each task writes to its own temporary table, and on global commit all of these temporary tables are copied into the destination table, and deleted, in a single ExampleDB transaction.

Remember this is all experimental

The V2 API is marked experimental, and indeed a quick look at the Spark source code on GitHub reveals that some (apparently minor) refactoring has already happened since the 2.3.0 release, and we should probably assume there will be more before the 2.4.0 release. I'll try to track releases as quickly as I can until the API stabilizes.

I'm experimenting too

Given the challenges this project presents, I may need to change my approach over time, and I hope to get feedback from you about how well the approach is working.

Sunday, October 23, 2016

Developing Utility Bolts for Apache Storm

Apache Storm is a distributed stream processing framework: one of many such frameworks but among the most popular. Storm applications ("topologies") are composed of "spouts" (sources of data) and "bolts" (data transformations), and these are connected by "streams" of "tuples", which are a sequence of typed key/value pairs of data. The spouts and bolts can be thought of as vertices in a directed acyclic graph, and the streams as edges. The spouts are always graph sources, with only outgoing edges, but the bolts may have either both incoming and outgoing edges, or they can be sinks, with only incoming edges.

Storm provides various pre-defined components, most of them spouts, providing standard data sources for streaming data from database systems, file systems, queueing systems and network listeners such as a web server, and so on. Similarly it provides pre-defined bolts, some serving as data sinks along the same lines, as well as interfaces to the usual logging frameworks.

In this post I'm going to examine what it takes to do a good job of adding reusable transformers (in the form of utility bolts) to Storm, for use by topology developers. Storm already provides a number of these, mostly in the package org.apache.storm.starter.bolt, and a few more in the package org.apache.storm.testing. (Storm follows a convenient naming convention where all bolt class names end with "Bolt".) Alas, most of these are completely undocumented, at least in the JavaDoc, but many are quite simple, and their usage can be worked out from a quick read of the source. Standard transformations can provide simple operations like projecting out unwanted fields, or much more complex ones like filtering, aggregation or data smoothing.

Since sometimes spouts and bolts have common or interacting design issues I'll occasionally touch on the design of spouts, but that's a complex topic in itself that is mostly out of scope for this post.


Since this is intended to be a practical guide for writing reusable bolts, I'm going to assume that you already understand the basic mechanics of writing a very simple bolt and getting it working in a Storm topology. A good source for learning how to do this is the book "Storm Applied: Strategies for real-time event processing" by Sean T. Allen, Matthew Jankowski, and Peter Pathirana. I'm also assuming that you have the most basic familiarity with Storm's Java API.

Open Source Project

You may want to read this in conjunction with the storm-gadgets project on GitHub, which includes a small number of bolts largely developed using the design principles described here, although I'll leave detailed discussion of the actual code to another post.

Design Goals

First I'd like to propose some design goals for creating utility bolts:

Ease of use: The purpose and behavior of the bolt should be clear and it should be easy to set up and include in a topology.

Appropriate Generality: When designing reusable components of any kind there's a tradeoff between ending up with lots of similar components on one hand, and components with complex configuration on the other. When adding components to an existing framework it helps to "blend in" with how existing components have handled this compromise. Another facet of generality is adaptability to as wide a range of topologies as possible, in terms of variations like concurrency support, reliable delivery support, tuple contents, and so on.

Robustness: Good choices need to be made about what kinds of errors are tolerated and which lead to topology execution failure. Here again the pre-existing components can be a guide. Furthermore, in the streaming world it's very expensive to allow bad input data or a localized problem to terminate the application. It's usually best to avoid interrupting stream processing in all but the most severe cases: anything that prevents the successful processing of a large fraction of tuples.

Ease of Diagnosis: It's important to be able to diagnose misconfiguration of these components as well as failures, or other faults, during their execution. Again, existing components can be a guide here, but broadly we want to be able to read the usual logs and see what is happening in each component – easily being able to zoom in on a specific component type and/or component instance. The reader of log messages needs to be able to understand the scope and severity of each reported problem, and ideally what to do about it: fix bad configuration, restart the topology, solve an environmental problem, etc.

Performance and Scalability: In addition to the component itself performing well, it should not detract from the performance and scalability of the topologies that use it any more than necessary.

Implementation Guidelines

To meet the above component design goals in the Apache Storm framework, we need to address certain technical issues. I'll leave performance and scalability to a separate post, and address the functional issues here. As mentioned earlier, this discussion will occasionally refer to the Java API, although that's not the only option for implementing bolts.

Distinguishing between inputs: A component may take inputs from multiple other components, and will often treat those inputs differently -- that is, they have different roles in the operation of the component, and so the topology developer will need to be able to specify which input stream has which role. Furthermore, upstream components may be emitting tuples on multiple streams, and sometimes multiple output streams of a single component may be consumed by our component. In Storm, streams have names local to the component that emits them, and components within a topology live in a flat namespace where they have global names. Storm provides the class org.apache.storm.generated.GlobalStreamId for dealing with this two-level namespace. In short, the component must support dealing unambiguously with the names of streams.

Organizing outputs in a consumable way: Our own component may need to generate multiple output streams, in which case they need to be named. Even if there is only one, there may be reasons not to simply use the default output stream (whose name is, aptly enough, "default".) Sometimes it will make sense to generate the stream names ("out1", "out2", …) but in other cases they will need user configuration to fit them into a topology. The fields in output tuples will also need names, which may be fixed or generated in some cases, and need to be configured in others. This can be a lot for the user to configure, and the decision as to what needs to be configured should be made carefully. Finally, there are cases where it may be tempting to choose the output stream and field names based on the input stream and field names. There are two problems with this. First, while it may seem like a great way to avoid configuration altogether, Storm spouts and bolts are required to declare their streams and fields (and the order of fields) via the declareOuputFields() callback when the topology is initialized. Second, while it is often practical to use the configured names of inputs as names of outputs, you need to watch out for collisions – multiple input components may use the same stream name, and multiple streams may use the same field name. In short, simply passing input names through as output names is not a viable bolt design strategy in Storm.

Interoperating with Guaranteed Delivery: The degree to which a topology achieves guaranteed delivery of tuples depends on its configuration, as well as the behavior of the spouts and bolts. Spouts need to assign IDs to tuples, bolts need to anchor their emitted tuples appropriately with respect to input tuples, and all components need to acknowledge tuples appropriately. Spouts have to implement the ack() and fail() methods, which also impacts the nextTuple() method, as emitted tuples need to be stored, keyed by their tuple ID, until they are either acknowledged (and then deleted) or failed (and then replayed.) Finally, bolts that communicate with external systems such as databases or queueing systems will need to "fail" the tuple when operations on external systems fail, so that it will later be replayed. When developing a utility component, we don't know whether guaranteed delivery will be used in a particular topology -- it usually needs to support either behavior. Fortunately, if we develop the component as if guaranteed delivery will be used, it can also be deployed without it. As we will see below, doing this sometimes it raises complex design issues.

Concurrency: It is straightforward to write components in a way that allows Storm to operate multiple instances in parallel, but problems arise when we use these components in a topology and try to decide on what grouping method to use to connect them. Often a shuffle grouping will work – in particular, if the bolt processes each tuple completely in isolation from others. It gets more complicated if the order of tuples is significant to the bolt, or they need to be grouped in some way – then often a fields grouping is appropriate. This is all in a day's work for Storm topology developers, but it requires understanding the behavior of each spout and bolt. As utility component developers, it's up to us to understand our component's behavior well enough to document the grouping requirements it imposes, and sometimes this can be complex as it may be different for different inputs. Spouts have additional responsibilities with respect to concurrency, as the various spouts reading from an external data source need to divide the data among themselves. When reading from a queue, this is straightforward, but if reading from a DBMS they may have to work out how to explicitly partition a table.

Error handling: The issue of error handling in streaming applications is complex and covering it completely in this post seems impossible. As utility component developers, however, we need to understand and document how our component interacts with system failures in the topology around it, and also what it considers "invalid" configuration and "invalid" input data.

Misconfigurations should usually be reported when a component is initialized (from the constructor) or during the call to prepare(), as they should, if at all possible, be reported before the topology starts to execute and should, in most cases, prevent it from executing. One major kind of misconfiguration that components should always check for during initialization is whether an appropriate set of input streams and output streams have been configured -- there's usually no point starting to execute data if they haven't. This is also a good time to check for groupings that can't be supported, concurrency levels that can't be supported, as well as combinations of grouping and concurrency.

Invalid tuples are a different matter: unclean data is a regular fact of life, and data pipelines should recover and continue executing whenever possible after an invalid tuple is received. This can be either very simple or complex depending on the nature of your component. One thing to remember is that if you effectively drop a tuple for being invalid, you still need to acknowledge it so it doesn't get replayed when guaranteed delivery is being used – this can feel counterintuitive but is very important. There remains the issue of reporting the problem to support diagnosability. It's important to be able to monitor whether the number of tuples (absolute or as a proportion of data processed) each component has rejected is very small or very large. In the latter case, hopefully an administrator should be alerted to check whether there is a major, systematic configuration or data source problem. Sometimes the administrator will have the luxury of stopping the data pipeline, but often this is out of the question. Millions of tuples may be rejected before an upstream problem is solved, and you don't want your alerting mechanism to cause more problems than it solves. For example, logging every rejected tuple can seem like a good idea, and indeed be very useful, until the logs fill up a storage device or the logging slows the topology to a crawl. Logging needs to be used judiciously, and logging the occasional rejected tuple is probably still a good idea. Logging the number of rejected tuples from time to time can also be useful. For some components, particularly those that are "fussy" about their inputs, it may make sense to output something (perhaps a count, or an error message) on a dedicated output stream whenever a tuple is rejected. It may even be tempting to output the entire tuple, but this is not straightforward. Since the field signatures of a component's output streams need to be pre-declared, it's hard to emit an unexpected field. One approach is to serialize the entire rejected tuple into a single field, perhaps called "tuple", perhaps in a serialization format that is both machine and human readable.

Spouts that attempt to support guaranteed delivery also need to handle situations where either tuples are not being acknowledged for a long time (imposing an huge interim storage burden on the spout) or repeatedly being failed (adding a retransmission burden to that storage burden) in both cases suggesting that something is seriously wrong. Such situations can be handled by occasional reaping of old tuples and by imposing limits on the number of retries – both requiring additional information to be stored with the transmitted tuple, as well as judicious decision making by the designer

Logging: Storm now uses SLF4J for logging, and it's straightforward for individual components to use it as well. Any logging done on a per-tuple basis should be at the DEBUG level so it can be disabled in production. Major component lifecycle and configuration events should be logged as INFO as it's cheap to log them and they should always be available.

One aspect of logging to be aware of is that a component can only become aware of its ID in the topology when prepare() is called. If you want to use it for logging elsewhere (and you will) you need to save it at that time. Furthermore, not only is the ID not known in the constructor, but it is also not known in declareOutputFields(), which is called before prepare(). If it seems useful for the association between the component ID and its configuration (and perhaps output fields) to be clear in the logs, you may want to log it all inside prepare() even though it was already available in the constructor and it may have been tempting to log it there.

Interactions with external systems: Spouts often read data from external systems and bolts can read or write data from/to such systems, or both. To do this responsibly, they should not overuse the resources of those systems, including connections. This includes limiting the number of concurrent connections, disconnecting responsibly when cleanup() or deactivate() are called. As mentioned earlier, it needs to be clear what happens when multiple instances of a component read from the same database table – are they replicating the data or partitioning it? An additional complication to keep in mind is that when guaranteed delivery is in play, the input tuple to a component may be replayed -- it's necessary to think through what effect this will have on the external system.

In Practice

You can make up your own mind as to how well the bolts in the project meet the design goals and conform to the implementation guidelines: I'll discuss some of them in detail in future posts. If you have bolts of your own that raise interesting issues, or feedback on the ideas discussed here, please let me know.

Thursday, October 20, 2016

Learning to use Apache Spark and Kafka Together

I recently wrote about the unexpected popularity of the LearningSpark project on GitHub, and speculated that some of the popularity stemmed from the convenience of not having to set up a Spark server -- each example program is more-or-less self-contained. This approach has certain limitations (for example, it's an awful way to explore performance and scalability issues) but it does leave people free to concentrate on model and API issues in isolation. This can be useful not only for Spark beginners, but also, as Spark evolves, it's a good way to understand the new features and the problem solving approaches they support.

Sooner or later, a large fraction of Spark users end up grappling with how to use Spark in conjunction with Apache Kafka. Now, the overheads of setting up both a Spark cluster and a Kafka cluster before you can write the stream processing equivalent of "Hello World" can be quite high, especially if you're still learning BOTH systems. Being in this situation myself, I started to wonder how hard it would be to set up a project where both the Spark system and the Kafka broker (and ZooKeeper instance) were embedded in the example program. Such an approach would have the same limitations as the Spark project described above, and probably suffer from them even more, but conversely, the benefits of such simplification would be even greater. It turns out not to be very hard to achieve, as you can see at another GitHub project, spark-streaming-with-kafka, but it does have certain gotchas, which I'll discuss.

It seems to me that the Kafka project has suffered rather considerable API churn, not just in the details of the APIs but in the fundamental conceptual model as well. Currently in Spark 2.0.0, released in July of 2016, Spark support for the APIs is lagging somewhat, supporting Kafka, released in February of 2015. This seventeen month lag causes some minor difficulties, although the situation seems likely to improve in Spark 2.1.

The main impact of this lag in API support is in the area of the kafka-unit project, which provides convenient APIs for managing an embedded Kafka instance. This project is now at version 0.6, but to get a release that supports Kafka we have to go back to kafka-unit 0.2, which is missing some handy newer features like creating partitioned topics. I've had to work around this in one of the utility classes discussed below.

Utility Classes

So far, I have needed the following utility classes to keep the examples sane. Note that these links point to a version of the code specially tagged to provide permanent links: keep this in mind if you want to see more recent versions of the code.

  • EmbeddedKafkaServer: Encapsulate uses of the kafka-unit project to embed a Kafka instance, working around the fact that, as disucssed above, we have to use a rather old version of that project.
  • SimpleKafkaClient: Some trivial default configuration for the producers and consumers used in the examples.
  • SparkKafkaSink: An extension of the code provided in Marcin Kuthan's rather useful blog post about publishing to a Kafka topic from a Spark job.His code uses the default partitioner, essentially broadcasting the contents of each RDD partition to all of the topic partitions, effectively causing a random repartitioning. The additional overload of the send() method here instead allows the topic partition to be specified, so the contents of all RDD partitions are sent to the same topic partition. I'm not sure this is useful in practice, but it helps to illustrate the relationship between RDD partitioning and topic partitioning.


So far, the following five examples are available. Once again, the links point to a tagged version of the code.

  • SimpleStreaming: The most basic streaming example: starts a Kafka server, creates a topic, creates a stream to process that topic, and publishes some data using the SparkKafkaSink.

    For each of the received RDDs, the code prints the number of partitions and the number of elements in each partition. The code exercises no control over the partitioning of the received RDDs, and there turn out to be two partitions each time, compared with four in the originating RDD and four in the topic. By examining the partitioning here, we set the stage for exercising some control over it in later examples.

    Notice there's quite a lot of waiting. It takes some time for streaming to get going, and data published too early tends to be missed by the stream. (No doubt, this is partly because this example uses the simplest method to create the stream, and thus doesn't get an opportunity to set auto.offset.reset to "earliest".) Also, data that is published takes some time to propagate to the stream. This seems inevitable, and is almost guaranteed to be slower in a self-contained example like this.
  • ExceptionPropagation: This example demonstrates that exceptions encountered in stream processing are rethrown from the call to awaitTermination(). The custom exception SomeException is thrown when an RDD is received.
  • MultipleConsumerGroups: This differs in creating two streams based on two different consumer groups, so both streams get a copy of the same data. It's simply a matter of specifying the two names of the two different consumer groups in the two calls to createStream() for the same topic -- no special configuration is needed. The two calls create two instances of ReceiverInputDStream, and then foreachRDD is called on each of those. This is valuable if you want to create more than one processing pipeline on the same data
  • PartitionedStreaming: By calling createDirectStream() instead of createStream(), you can get the generated RDDs to have a number of partitions (in this case 6) dictated by the partitioning of the topic.
  • ControlledPartitioning: Here the topic has six partitions but instead of writing to it using the configured partitioner, we assign all records to the same partition explicitly. Although the generated RDDs still have the same number of partitions as the topic, only one partition has all the data in it. This demonstrates how to exercise control over partitioning all the way from the original RDD, through the topic to the resulting RDDs.

Feedback please!

Personally I've found this project useful in improving my understanding of Kafka itself, as well as the Kafka integration features of Spark Streaming. It's always hard to tell which of these projects are useful to others and why, so I look forward to hearing from you about your experiences with the code.

Sunday, October 9, 2016

Learning Spark with Java

In a recent post I discussed the history and motivation of my LearningSpark project on GitHub. While that project is mostly based on the Scala APIs to Apache Spark, I explained why I had begun to explore the Java APIs as well. I also predicted that I would soon introduce a separate project, based solely on Maven and Java, to continue the Java exploration: most Java programmers are much more comfortable with Maven than with sbt, and a separate project allows me to choose the Java version appropriately.

The new learning-spark-with-java project on GitHub is the result. It started with a copy of the examples on the original project, but since I've now adopted Java 8, I rewrote the examples to make use of the latter's lambda expressions, perhaps ironically making the code now look more like the original Scala code.

I'll proceed with this project using the guidelines I listed in the LearningSpark project when I branched out into Java. I will almost definitely not:

  1. Rush to catch up with the Scala examples,
  2. Keep the two sets of examples perfectly (or even well) matched,
  3. Branch out into Python and R as well (seriously, I have no interest in doing this.)

I'll probably still focus on the Scala examples more, as new features seem to mature a little faster in the Scala API. I am unlikely to add to the Java examples in the LearningSpark project, and if they get in the way or create confusion, I may eventually delete them. As always, feedback is welcome, and I'm especially curious to see whether the community finds this project as useful as some people obviously found the earlier one.

Sunday, August 28, 2016

Taking a Detour with Apache Spark

Almost two years ago, while preparing for a talk I was giving at the now defunct Seattle Eastside Scala Meetup, I started a public GitHub project collecting and organizing Apache Spark code examples in Scala. I had stumbled on a way to run the examples on all supported platforms without setting up or deploying to a cluster, so the overheads of experimenting with the Spark APIs and programming idioms were remarkably low. It seemed like this approach was not well known at the time, so I shared it via the GItHub project and by posting here. Other than avoiding the overheads of a Spark cluster, the main feature of the project has been a "baby steps" approach to the examples. I've tried to demonstrate each API feature with multiple, slightly varying examples and (with notable, unfortunate exceptions) comments, to build intuitions before leaving the readers to take their chances with the Scaladoc.

Two years and about sixty sample programs later, I'm still not sure of the project's role and future, except that it has been tremendously helpful to my learning about Spark and Scala. The Apache Spark project's documentation and examples have improved, as has test coverage -- the latter always being a good way to learn about a new feature, except when there isn't any. The Databricks blog has also made a difference. And yet, the project continues to be useful to me, and I occasionally hear from others who find it helpful, including one local company that uses it in their training program. I like the "baby steps" approach to learning an API, and apparently I'm not the only one.

But lately I've had to ask myself some hard questions about the project. As I hope to post separately about soon, the evolution of Spark SQL's object model (remember SchemaRDD?) has made the task of keeping the project organized rather challenging lately -- I don't like to move examples around so I don't break links from the blog, StackOverflow and elsewhere. Another problem that's been nagging at me lately is my choice of Scala for the examples. I enjoy using Scala, have enjoyed learning it, and the Apache Spark project continues to keep the Scala APIs as a first class citizen. Indeed, Spark is written in Scala, but as I'll discuss later, that's no guarantee of strong support for Scala APIs. I've never been interested in the Python or R APIs, even though I believe they're of tremendous importance to the industry: I'm not part of the target audience (broadly, the data scientist) and I don't enjoy programming in either language. That leaves Java.

Time to explore the Java APIs

Many of you have seen the various Typesafe/Lightbend surveys showing Scala to be more popular than Java for Spark development -- the latest one has it at 76% Scala, 58% Java 8 and 34% Java 7 or lower. Clearly, there is overlap, so it's not clear whether Java or Scala are more popular overall. I see several reasons to explore Spark from the Java perspective:

  • Java is clearly an important part of the Spark ecosystem, as the surveys show.
  • The Java APIs are not merely an afterthought in Spark: real effort seems to have been invested in making Java programming practical and a reasonable approach.
  • While even a quick examination of the Spark project's Java examples (which date back to Java 7) shows them to be verbose and awkward compared with the Scala examples, the introduction of functional programming features in Java 8 raises the possibility of Java catching up.
  • I see a certain hesitation about Scala in the "big data" ecosystem. Lightbend has taken the "we don't have to choose" approach, and seems to be pretty sincere about it -- and of course they should be if they believe their own survey results. Confluent's decision about Apache Kafka is a bit more interesting: Kafka is also written in Scala, but only supports a Java API, with others provided by the community. While Cake Solutions actively develops the scala-kafka-client project, the Scala APIs are definitely not a first class citizen.
  • I've been a Java programmer, on and off, for 18 years. Before Scala, it was my recreational language of choice, and I still like it. I'm curious about Java 8, which I've only used a little, for another recent project.

Together, these certainly don't motivate me to abandon Scala, but they do motivate me to understand the tradeoffs better than I do now. The bottom line is that I've started adding some Java examples to the project, and started marking my commit messages with "[Scala]" or "[Java]" as appropriate.

Important Questions

I'm definitely making this up as I go, so let me expose some of the decisions I'm trying to make.

Which Examples?

I started with Dataset and DataFrame, since I had recently worked on those in Scala. But I'd at least like to get a cross section of the different areas: core RDDs, SQL, streaming and perhaps GraphX. Then I'll probably focus more on the areas that bring out interesting differences, whichever they turn out to be. There's no point exploring Spark SQL as a query language comprehensively in both Java and Scala, so I won't do it in Java.

Which Version(s) of Spark?

This is easy: much of the reason I invest in the project is to keep up with Spark evolution, and it takes a lot of effort. I'll continue adopting each new Spark release as soon as I can, and use its new features.

Java 8 or Earlier?

Java 8 seems to be getting a lot of adoption, and the new features definitely make it better suited to Spark. But the APIs have a number of features that were intended to work around the deficiencies of earlier versions of Java (such as all of, so it seems interesting to explore them for a while. Yet I'll probably change to Java 8 soon to keep the project from becoming a museum.

One or Two Projects on GitHub?

So far I've used the parallel Scala/Java source tree structure of sbt projects to locate the Java code in the same project as the Scala code, but I'm already feeling like this was a bad idea. I think it hinders my ability to serve the community, since Java programmers are much more likely to be familiar with Maven than sbt, and the one Java class I had written to support the Scala code (hiveql.SumLargeSalesUDAF) is now tangled up with the Java examples. I think you can expect to see a separate project soon. (Splitting the projects also allows me to use different Java versions.)

Parallel Organization?

As I mentioned earlier, the evolution of the object model around Spark SQL has made it hard to keep the project organized, and the Scala examples are getting out of hand. I'm not going to inflict this entropy on Java developers, and will try to organize the Java examples according to my current understanding of how Spark fits together. In due course this may help me sort out the organization of the Scala examples too -- in any case I'm hoping to write a separate post on this topic.

How Much Effort?

I don't know how much I'll balance my effort on Scala and Java examples, or even whether I'll keep working on the Java ones for much longer. It depends on feedback, how much insight I get, where the community ends up going, and how Java and Scala (and Spark) evolve.

Abandoning Scala?

I've already made this decision: definitely not. It's now my recreational language of choice, and I think it has a future. At the very least, I plan to keep up my attempts at covering major Spark features in Scala as they evolve.

Feedback please!

While my "baby steps" approach to Spark examples seems to have had some impact on the community, I get very little direct feedback. Occasional questions have inspired some of the examples, which I hope were helpful to those people, and one local professional who reached out and told me how he has found the project valuable has dramatically increased my motivation. I'd be delighted to hear about your experiences, either about the examples themselves, or about Spark in general.

Sunday, January 31, 2016

Apache Spark in Practice: US Airline On-Time Performance

A few years ago I developed a fascination with a data set published by the Bureau of Transportation Statistics in the US Department of Transportation: "Airline On-Time Performance and Causes of Flight Delays: On_Time Data." This data usually attracts attention because of flight delays, but actually contains lots of broader information about US airports, airlines, routes, traffic patterns and even, to a point, aircraft utilization. As such, it's really a window into the whole topic of commercial passenger air transportation in the United States. Some basic characteristics of the data set are as follows:

Time span covered1987 to 2015
Number of scheduled flights162,212,419
Number of aircraft tail numbers14,858
Number of airlines31
Number of airports388
Number of airport (origin,destination) pairs
with at least one flight

My interest in Apache Spark is no surprise to readers of this blog, but recently the two topics collided when I was learning how to run a Spark cluster through Amazon's Elastic MapReduce service, and read a blog post by Jon Fritz on the Amazon Web Services official blog. The post shows how to run some simple Spark SQL queries against a copy of this data set hosted on Amazon's S3 storage service, conveniently converted to Parquet for easy and efficient access from Spark. I had been shopping for a somewhat real-world project through which to study ways to write efficient computations in core Spark using Scala, and so a project was born.

Is this a good data set for Spark?

Admittedly, the on-time performance data is not huge. But, with a modest cluster, fairly simple queries against the full data set take several minutes, and complex queries, or simple queries written badly, take a lot longer. While there's twenty nine years of data, you can also have quite a lot of fun with a contiguous subset, say just one or two years, and simple queries against that run quickly on an affordable, well configured PC.

At first, the data may seem quite simple. Partly that's an artifact of the denormalization that plagues so many public data sets. But also, the structure of this data is subtle, with significantly graph-like structure at multiple levels. The airports and regular routes between them form a pretty interesting graph, with valuable data on both the vertices and the edges. Multiple flights can be linked by flight number or aircraft tail number. Finally, there are lots of interesting correlations (or absence thereof) with external data to be explored. Weather seems like a good place to start, but the demographic and economic data for nearby cities could be interesting too. I haven't tried using GraphX on this data set yet, but I'm really looking forward to it.

Running the code

I started coding on this over two months ago. I'm sure my explorations will provide material for quite a few posts, but for now I'd just like to introduce the project, which is available on GitHub. See the README for information about how to run the examples. To add another experiment you need to extend the CoreExperiment class, just like the existing core Spark examples do, and tell the 'registry' about it in the Flights class. Then you can either run all the registered experiments, or just the comma separated list you provide on the command line. The README explains how the output is organized. You don't actually need to use Elastic MapReduce to run the examples: you can either download the entire data set (a bit large) or use my ParquetSubsetMain utility to create a subset. Then you can either submit it to the cluster of your choice or use the "--local" flag to run it as a stand alone Scala program. During my own development, I use the latter technique: I run FlightsMain as a stand alone program, using a two-year local Parquet extract of the data. I'm only testing the code on Linux. When I run against the full data set I start an EMR cluster, use sbt's "assembly" command to generate a self-contained JAR, upload it to S3, submit FlightsMain to the cluster, and collect my output from S3.

Why so much framework?

Somewhat to my surprise, the code has ended up rather "framework heavy." This happened in response to goals I initially didn't know I had, but discovered along the way:

  1. A uniform way to run both core Spark and Spark SQL experiments.
  2. An easy way to get results, including performance measurements and diagnostics, back out to S3 without lots of maintenance.
  3. A way to run all the registered experiments or just specific ones, in a specific order, possibly with repetition to help obtain consistent performance results.
  4. Easy switching between local execution, with a development environment and a debugger, on a subset of the data, and execution on a cluster against all the data.

Project goals

I think I'm really trying to study two things with this work: how to do real work with core Spark, and benefit from the efficiency advantages of doing so, without drowning in complex Scala code. Frankly, I'm not even sure how great the advantages of using core Spark are, or whether drowning in complex Scala code can be avoided, although I'll point out that the first question can be answered through measurement, while the second is rather subjective. I like Scala and core Spark, but they both take a lot of investment to learn to use well, and they're certainly not for everybody.

What's Next?

Over the next few posts I hope to illustrate some of the basic techniques of core Spark using simple examples for this project. I look forward to hearing from you about your impressions, and about your own experiences with using core Spark from Scala.

Saturday, May 16, 2015

Efficient Spark SQL Queries to MongoDB

In previous posts I've discussed a native Apache Spark connector for MongoDB (NSMC) and NSMC's integration with Spark SQL. The latter post described a naive implementation of Spark SQL integration, intended to demonstrate its value and the applicability of Spark SQL's recently introduced external data source API. In particular, I had then made no attempt to make the implementation efficient. In this post I will describe how to use some of the features of the external data source API to achieve a much more efficient Spark SQL integration.

General Approach

As discussed in earlier posts, the major challenges in implementing a Spark SQL external data source for MongoDB are:

  1. Efficient schema inference for the entire collection.
  2. Efficient use of MongoDB's query capabilities, based on Spark SQL's projection and filter pushdown mechanism, to obtain the data required for each Spark SQL query.

The NSMC project is hosted on GitHub, and the class nsmc.sql.MongoRelationProvider is a good starting point for reading the Spark SQL integration code.

Schema Inference

The Spark SQL schema for a MongoDB collection is computed when the collection is registered using, say, Spark SQL's CREATE TEMPORARY TABLE command. The goal of schema inference is to generate a Spark SQL schema (Seq[StructField]) to which each document in the collection conforms, so that each document can be converted to an org.apache.spark.sql.Row in order to produce an RDD[Row] as a query result. Some of the subtleties of what it means to infer such a schema were discussed in the original post on Spark SQL integration.

Schema inference itself makes use of Spark's parallel computation features. First, a set of partitions is computed for the collection. Then these are parallelized into an RDD. Next, each partition's collection data is read (in parallel) and a separate schema is computed for each partition. Finally, these schemas are all merged into a single schema that definitively represents the collection. This approach is summarized in Figure 1.

Figure 1: Schema inference

An important characteristic of this algorithm is that the collection is not stored -- the documents are analyzed "on the fly" to compute the schema. Obviously this can be a good thing, if the queries end up depending on small subsets of the collection, or a bad thing if the queries tend to re-scan the entire collection every time.

Collection Scan

A collection is scanned when Spark SQL calls the buildScan() method on the MongoTableScan class, which it does in order to process each query -- this interface was discussed in an earlier post on the external data source API. This method is passed the filters and projections that Spark SQL can benefit from being executed by MongoDB. This information is first used to generate an appropriate MongoDB query as discussed below. That query is executed to obtain an RDD[DBObject] (the representation of a document in the MongoDB APIs), which is then converted to an RDD[Row] using the schema information computed previously. This means that the conversion, like schema inference, also occurs in parallel, although the performance benefit of this is dependent on how skewed the query results are.

Figure 2: Table scan

Generating MongoDB queries from the parameters to the buildScan() method is quite straightforward. MongoDB's find() method happens to be conveniently factored for this, as it takes one parameter for filtering and one for projection. I've given a number of examples of how this conversion turns out below.

Example collection and queries

The best way to understand how NSMC's Spark SQL integration works is to look at some example Spark SQL queries together with the generated MongoDB queries.

The collection and its inferred schema

To understand how query evaluation works, we'll look at the following simple set of MongoDB documents:

  "_id" : ObjectId("554cc53280cecbc6a8579952"), 
   "item" : 1, 
   "quantity" : 5, 
   "price" : 12.5, 
   "category" : "A" 
  "_id" : ObjectId("554cc53280cecbc6a8579953"), 
  "item" : 2, 
  "quantity" : 10, 
  "price" : 12.5, 
  "location" : 
       "state" : "AZ", 
       "zip" : "85001" 
  "category" : "A" 
  "_id" : ObjectId("554cc53280cecbc6a8579954"), 
  "item" : 3, 
  "quantity" : 15, 
  "price" : 12.5, 
  "category" : "B" 
  "_id" : ObjectId("554cc53280cecbc6a8579955"), 
  "item" : 4, 
  "quantity" : 20, 
  "price" : 12.5, 
  "location" : 
       "state" : "NV", 
       "zip" : "89101" 
  "category" : "B" 

NSMC infers the following Spark SQL schema for these documents:

 |-- _id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- item: integer (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- zip: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: integer (nullable = true)

Filter and Projection Pushdown

As I discussed in an earlier post, Spark SQL's external data source API allows simple cases of filtering and projection to be pushed down to the back end database query, to make use of any indexing capabilities and to minimize the amount of data that has to be transfered back to Spark.

Let's start with the simplest of queries to see how this works, assuming we have registered the above collection with Spark SQL as the table mongoTable.

SELECT * FROM mongoTable

NSMC simply requests all the columns in the inferred schema from MongoDB. Since the always-present _id column is not explicitly suppressed, it will be present in the output. (MongoDB's convention for this field is that it is always returned unless explicitly suppressed.) Including it in this case was a design choice that seems to result in the most intuitive behavior for * queries. The important design choice was actually to include it as a column in the schema. Once that was done, Spark SQL would expect it to be returned in every * query.

Here is the generated MongoDB query, with no filtering and [effectively] no projection.

  { }, 
  { "category" : 1 , "item" : 1 , "location" : 1 , 
    "price" : 1 , "quantity" : 1 }

This query happens to be equivalent to both of the following, but NSMC doesn't recognize the fact:

collection.find( { }, { } )

collection.find( )

A query that explicitly projects columns results in only those being requested from MongoDB.

SELECT item, quantity FROM mongoTable

In this case, Spark SQL would not know how to deal with the _id column even if it was returned, so NSMC explicitly suppresses it to save bandwidth. It's probably not a very important optimization.

  { }, 
  { "item" : 1 , "quantity" : 1 , "_id" : 0 }

Now let's move on to simple filtering.

SELECT item, quantity 
FROM mongoTable 
WHERE quantity > 12

Indeed, the filter is pushed through to the MongoDB query.

  { "quantity" : { "$gt" : 12}},
  { "item" : 1 , "quantity" : 1 , "_id" : 0}

A disjunctive query is not currently handled by NSMC, although the capability to push it through was introduced in Spark 1.3.0.

SELECT item, quantity 
FROM mongoTable 
WHERE quantity > 12 OR quantity < 8

So only projection is pushed through.

  { "item" : 1 , "quantity" : 1 , "_id" : 0}

Accessing a nested document or array element doesn't disable projection pushdown.

SELECT item, quantity, 
FROM mongoTable

The projection is "widened" to the top level column, so somewhat too much data is pulled in from MongoDB.

  { "item" : 1 , "quantity" : 1 , "location" : 1 , "_id" : 0}

Filtering on a nested document or array element is more problematic.

SELECT item, quantity, 
FROM mongoTable 
WHERE location.state = 'AZ' AND quantity = 10

There's no widening trick to play here, so the problematic conjunct is not pushed through.

  { "quantity" : 10 }
  { "item" : 1 , "quantity" : 1 , "location" : 1 , "_id" : 0}

We should also check an aggregation query.

SELECT category, sum(quantity) FROM mongoTable GROUP BY category

The external data source API doesn't have a way to push grouping and aggregation through, so MongoDB is just asked to fetch the underlying data.

  { },
  { "category" : 1 , "quantity" : 1 , "_id" : 0 }

Scope for Improvement

A number of changes either in NSMC's implementation or in Spark SQL's implementation could result in still more efficient processing of queries to MongoDB.


Scanning the entire collection to compute a schema has the advantage of not missing documents that would have added valuable schema information. On the other hand, it can be very expensive. Sampling clearly has to be optional, with a parameter allowing the user to specify how much of the collection to sample.

Narrower Schema Inference

Currently, a schema is always inferred for the entire collection, which can be expensive if queries will only use part of the collection. An obvious refinement would be to allow users to specify a projection when registering the collection. Being able to specify a filter would be even more powerful, but less obviously useful.


The inferred schema is cached for use by all queries. This has its dangers, which are mitigated by Spark SQL's ability to refresh a registered collection.

Since the results of a query conform to all the standard characteristics of an RDD, they benefit from Spark's caching capabilities. However, it is not yet clear to me how much they benefit across multiple queries. This needs some investigation and experimentation.

Tighter Integration with Catalyst

Spark SQL's external data source API was designed with simplicity and ease of use in mind, and in my opinion it succeeds admirably in this regard. However, the API does provide for deeper (and harder to use) integration with Spark SQL's Catalyst query compiler. It may in the future be possible to improve NSMC's Spark SQL integration by using this feature.

Limitations of the External Data Source API

Spark SQL's external data source API basically can't handle nested structures or arrays. This means certain queries against a MongoDB collection will always require that too much data be returned from MongoDB to Spark. I don't know if this is at all on the radar of the Spark SQL developers.

Additionally, the external data source API does not support pushing joins or grouping and aggregation to the external database.

Integrating with Spark SQL's cost model

The external data source API allows an implementation to provide information about the size of a collection, but NSMC doesn't use this feature. It may allow the Catalyst query optimizer to make better query processing decisions.

Limitations of Catalyst

The Catalyst query optimizer currently has a limited cost model, and improvements in it may be helpful to NSMC's performance.

Improved Partitioning

NSMC's collection partitioning features are currently quite naive, and this may limit the efficiency gains of parallel schema inference and parallel query execution. This needs some experimentation.


In my original post about NSMC's Spark SQL integration, I described a very naive implementation, intended to show that such integration was possible and could be useful. In this post I've described more recent work to take advantage of Spark's parallelism for schema inference, to prevent schema inference from consuming large amounts of memory, and to take advantage of Spark SQL's filter and projection pushdown features to dramatically improve query performance in many cases. While there's a lot more to be done, these improvements in many cases give NSMC's Spark SQL integration quite realistic performance.