Sunday, October 23, 2016

Developing Utility Bolts for Apache Storm

Apache Storm is a distributed stream processing framework: one of many such frameworks but among the most popular. Storm applications ("topologies") are composed of "spouts" (sources of data) and "bolts" (data transformations), and these are connected by "streams" of "tuples", which are a sequence of typed key/value pairs of data. The spouts and bolts can be thought of as vertices in a directed acyclic graph, and the streams as edges. The spouts are always graph sources, with only outgoing edges, but the bolts may have either both incoming and outgoing edges, or they can be sinks, with only incoming edges.

Storm provides various pre-defined components, most of them spouts, providing standard data sources for streaming data from database systems, file systems, queueing systems and network listeners such as a web server, and so on. Similarly it provides pre-defined bolts, some serving as data sinks along the same lines, as well as interfaces to the usual logging frameworks.

In this post I'm going to examine what it takes to do a good job of adding reusable transformers (in the form of utility bolts) to Storm, for use by topology developers. Storm already provides a number of these, mostly in the package org.apache.storm.starter.bolt, and a few more in the package org.apache.storm.testing. (Storm follows a convenient naming convention where all bolt class names end with "Bolt".) Alas, most of these are completely undocumented, at least in the JavaDoc, but many are quite simple, and their usage can be worked out from a quick read of the source. Standard transformations can provide simple operations like projecting out unwanted fields, or much more complex ones like filtering, aggregation or data smoothing.

Since sometimes spouts and bolts have common or interacting design issues I'll occasionally touch on the design of spouts, but that's a complex topic in itself that is mostly out of scope for this post.


Since this is intended to be a practical guide for writing reusable bolts, I'm going to assume that you already understand the basic mechanics of writing a very simple bolt and getting it working in a Storm topology. A good source for learning how to do this is the book "Storm Applied: Strategies for real-time event processing" by Sean T. Allen, Matthew Jankowski, and Peter Pathirana. I'm also assuming that you have the most basic familiarity with Storm's Java API.

Open Source Project

You may want to read this in conjunction with the storm-gadgets project on GitHub, which includes a small number of bolts largely developed using the design principles described here, although I'll leave detailed discussion of the actual code to another post.

Design Goals

First I'd like to propose some design goals for creating utility bolts:

Ease of use: The purpose and behavior of the bolt should be clear and it should be easy to set up and include in a topology.

Appropriate Generality: When designing reusable components of any kind there's a tradeoff between ending up with lots of similar components on one hand, and components with complex configuration on the other. When adding components to an existing framework it helps to "blend in" with how existing components have handled this compromise. Another facet of generality is adaptability to as wide a range of topologies as possible, in terms of variations like concurrency support, reliable delivery support, tuple contents, and so on.

Robustness: Good choices need to be made about what kinds of errors are tolerated and which lead to topology execution failure. Here again the pre-existing components can be a guide. Furthermore, in the streaming world it's very expensive to allow bad input data or a localized problem to terminate the application. It's usually best to avoid interrupting stream processing in all but the most severe cases: anything that prevents the successful processing of a large fraction of tuples.

Ease of Diagnosis: It's important to be able to diagnose misconfiguration of these components as well as failures, or other faults, during their execution. Again, existing components can be a guide here, but broadly we want to be able to read the usual logs and see what is happening in each component – easily being able to zoom in on a specific component type and/or component instance. The reader of log messages needs to be able to understand the scope and severity of each reported problem, and ideally what to do about it: fix bad configuration, restart the topology, solve an environmental problem, etc.

Performance and Scalability: In addition to the component itself performing well, it should not detract from the performance and scalability of the topologies that use it any more than necessary.

Implementation Guidelines

To meet the above component design goals in the Apache Storm framework, we need to address certain technical issues. I'll leave performance and scalability to a separate post, and address the functional issues here. As mentioned earlier, this discussion will occasionally refer to the Java API, although that's not the only option for implementing bolts.

Distinguishing between inputs: A component may take inputs from multiple other components, and will often treat those inputs differently -- that is, they have different roles in the operation of the component, and so the topology developer will need to be able to specify which input stream has which role. Furthermore, upstream components may be emitting tuples on multiple streams, and sometimes multiple output streams of a single component may be consumed by our component. In Storm, streams have names local to the component that emits them, and components within a topology live in a flat namespace where they have global names. Storm provides the class org.apache.storm.generated.GlobalStreamId for dealing with this two-level namespace. In short, the component must support dealing unambiguously with the names of streams.

Organizing outputs in a consumable way: Our own component may need to generate multiple output streams, in which case they need to be named. Even if there is only one, there may be reasons not to simply use the default output stream (whose name is, aptly enough, "default".) Sometimes it will make sense to generate the stream names ("out1", "out2", …) but in other cases they will need user configuration to fit them into a topology. The fields in output tuples will also need names, which may be fixed or generated in some cases, and need to be configured in others. This can be a lot for the user to configure, and the decision as to what needs to be configured should be made carefully. Finally, there are cases where it may be tempting to choose the output stream and field names based on the input stream and field names. There are two problems with this. First, while it may seem like a great way to avoid configuration altogether, Storm spouts and bolts are required to declare their streams and fields (and the order of fields) via the declareOuputFields() callback when the topology is initialized. Second, while it is often practical to use the configured names of inputs as names of outputs, you need to watch out for collisions – multiple input components may use the same stream name, and multiple streams may use the same field name. In short, simply passing input names through as output names is not a viable bolt design strategy in Storm.

Interoperating with Guaranteed Delivery: The degree to which a topology achieves guaranteed delivery of tuples depends on its configuration, as well as the behavior of the spouts and bolts. Spouts need to assign IDs to tuples, bolts need to anchor their emitted tuples appropriately with respect to input tuples, and all components need to acknowledge tuples appropriately. Spouts have to implement the ack() and fail() methods, which also impacts the nextTuple() method, as emitted tuples need to be stored, keyed by their tuple ID, until they are either acknowledged (and then deleted) or failed (and then replayed.) Finally, bolts that communicate with external systems such as databases or queueing systems will need to "fail" the tuple when operations on external systems fail, so that it will later be replayed. When developing a utility component, we don't know whether guaranteed delivery will be used in a particular topology -- it usually needs to support either behavior. Fortunately, if we develop the component as if guaranteed delivery will be used, it can also be deployed without it. As we will see below, doing this sometimes it raises complex design issues.

Concurrency: It is straightforward to write components in a way that allows Storm to operate multiple instances in parallel, but problems arise when we use these components in a topology and try to decide on what grouping method to use to connect them. Often a shuffle grouping will work – in particular, if the bolt processes each tuple completely in isolation from others. It gets more complicated if the order of tuples is significant to the bolt, or they need to be grouped in some way – then often a fields grouping is appropriate. This is all in a day's work for Storm topology developers, but it requires understanding the behavior of each spout and bolt. As utility component developers, it's up to us to understand our component's behavior well enough to document the grouping requirements it imposes, and sometimes this can be complex as it may be different for different inputs. Spouts have additional responsibilities with respect to concurrency, as the various spouts reading from an external data source need to divide the data among themselves. When reading from a queue, this is straightforward, but if reading from a DBMS they may have to work out how to explicitly partition a table.

Error handling: The issue of error handling in streaming applications is complex and covering it completely in this post seems impossible. As utility component developers, however, we need to understand and document how our component interacts with system failures in the topology around it, and also what it considers "invalid" configuration and "invalid" input data.

Misconfigurations should usually be reported when a component is initialized (from the constructor) or during the call to prepare(), as they should, if at all possible, be reported before the topology starts to execute and should, in most cases, prevent it from executing. One major kind of misconfiguration that components should always check for during initialization is whether an appropriate set of input streams and output streams have been configured -- there's usually no point starting to execute data if they haven't. This is also a good time to check for groupings that can't be supported, concurrency levels that can't be supported, as well as combinations of grouping and concurrency.

Invalid tuples are a different matter: unclean data is a regular fact of life, and data pipelines should recover and continue executing whenever possible after an invalid tuple is received. This can be either very simple or complex depending on the nature of your component. One thing to remember is that if you effectively drop a tuple for being invalid, you still need to acknowledge it so it doesn't get replayed when guaranteed delivery is being used – this can feel counterintuitive but is very important. There remains the issue of reporting the problem to support diagnosability. It's important to be able to monitor whether the number of tuples (absolute or as a proportion of data processed) each component has rejected is very small or very large. In the latter case, hopefully an administrator should be alerted to check whether there is a major, systematic configuration or data source problem. Sometimes the administrator will have the luxury of stopping the data pipeline, but often this is out of the question. Millions of tuples may be rejected before an upstream problem is solved, and you don't want your alerting mechanism to cause more problems than it solves. For example, logging every rejected tuple can seem like a good idea, and indeed be very useful, until the logs fill up a storage device or the logging slows the topology to a crawl. Logging needs to be used judiciously, and logging the occasional rejected tuple is probably still a good idea. Logging the number of rejected tuples from time to time can also be useful. For some components, particularly those that are "fussy" about their inputs, it may make sense to output something (perhaps a count, or an error message) on a dedicated output stream whenever a tuple is rejected. It may even be tempting to output the entire tuple, but this is not straightforward. Since the field signatures of a component's output streams need to be pre-declared, it's hard to emit an unexpected field. One approach is to serialize the entire rejected tuple into a single field, perhaps called "tuple", perhaps in a serialization format that is both machine and human readable.

Spouts that attempt to support guaranteed delivery also need to handle situations where either tuples are not being acknowledged for a long time (imposing an huge interim storage burden on the spout) or repeatedly being failed (adding a retransmission burden to that storage burden) in both cases suggesting that something is seriously wrong. Such situations can be handled by occasional reaping of old tuples and by imposing limits on the number of retries – both requiring additional information to be stored with the transmitted tuple, as well as judicious decision making by the designer

Logging: Storm now uses SLF4J for logging, and it's straightforward for individual components to use it as well. Any logging done on a per-tuple basis should be at the DEBUG level so it can be disabled in production. Major component lifecycle and configuration events should be logged as INFO as it's cheap to log them and they should always be available.

One aspect of logging to be aware of is that a component can only become aware of its ID in the topology when prepare() is called. If you want to use it for logging elsewhere (and you will) you need to save it at that time. Furthermore, not only is the ID not known in the constructor, but it is also not known in declareOutputFields(), which is called before prepare(). If it seems useful for the association between the component ID and its configuration (and perhaps output fields) to be clear in the logs, you may want to log it all inside prepare() even though it was already available in the constructor and it may have been tempting to log it there.

Interactions with external systems: Spouts often read data from external systems and bolts can read or write data from/to such systems, or both. To do this responsibly, they should not overuse the resources of those systems, including connections. This includes limiting the number of concurrent connections, disconnecting responsibly when cleanup() or deactivate() are called. As mentioned earlier, it needs to be clear what happens when multiple instances of a component read from the same database table – are they replicating the data or partitioning it? An additional complication to keep in mind is that when guaranteed delivery is in play, the input tuple to a component may be replayed -- it's necessary to think through what effect this will have on the external system.

In Practice

You can make up your own mind as to how well the bolts in the project meet the design goals and conform to the implementation guidelines: I'll discuss some of them in detail in future posts. If you have bolts of your own that raise interesting issues, or feedback on the ideas discussed here, please let me know.

Thursday, October 20, 2016

Learning to use Apache Spark and Kafka Together

I recently wrote about the unexpected popularity of the LearningSpark project on GitHub, and speculated that some of the popularity stemmed from the convenience of not having to set up a Spark server -- each example program is more-or-less self-contained. This approach has certain limitations (for example, it's an awful way to explore performance and scalability issues) but it does leave people free to concentrate on model and API issues in isolation. This can be useful not only for Spark beginners, but also, as Spark evolves, it's a good way to understand the new features and the problem solving approaches they support.

Sooner or later, a large fraction of Spark users end up grappling with how to use Spark in conjunction with Apache Kafka. Now, the overheads of setting up both a Spark cluster and a Kafka cluster before you can write the stream processing equivalent of "Hello World" can be quite high, especially if you're still learning BOTH systems. Being in this situation myself, I started to wonder how hard it would be to set up a project where both the Spark system and the Kafka broker (and ZooKeeper instance) were embedded in the example program. Such an approach would have the same limitations as the Spark project described above, and probably suffer from them even more, but conversely, the benefits of such simplification would be even greater. It turns out not to be very hard to achieve, as you can see at another GitHub project, spark-streaming-with-kafka, but it does have certain gotchas, which I'll discuss.

It seems to me that the Kafka project has suffered rather considerable API churn, not just in the details of the APIs but in the fundamental conceptual model as well. Currently in Spark 2.0.0, released in July of 2016, Spark support for the APIs is lagging somewhat, supporting Kafka, released in February of 2015. This seventeen month lag causes some minor difficulties, although the situation seems likely to improve in Spark 2.1.

The main impact of this lag in API support is in the area of the kafka-unit project, which provides convenient APIs for managing an embedded Kafka instance. This project is now at version 0.6, but to get a release that supports Kafka we have to go back to kafka-unit 0.2, which is missing some handy newer features like creating partitioned topics. I've had to work around this in one of the utility classes discussed below.

Utility Classes

So far, I have needed the following utility classes to keep the examples sane. Note that these links point to a version of the code specially tagged to provide permanent links: keep this in mind if you want to see more recent versions of the code.

  • EmbeddedKafkaServer: Encapsulate uses of the kafka-unit project to embed a Kafka instance, working around the fact that, as disucssed above, we have to use a rather old version of that project.
  • SimpleKafkaClient: Some trivial default configuration for the producers and consumers used in the examples.
  • SparkKafkaSink: An extension of the code provided in Marcin Kuthan's rather useful blog post about publishing to a Kafka topic from a Spark job.His code uses the default partitioner, essentially broadcasting the contents of each RDD partition to all of the topic partitions, effectively causing a random repartitioning. The additional overload of the send() method here instead allows the topic partition to be specified, so the contents of all RDD partitions are sent to the same topic partition. I'm not sure this is useful in practice, but it helps to illustrate the relationship between RDD partitioning and topic partitioning.


So far, the following five examples are available. Once again, the links point to a tagged version of the code.

  • SimpleStreaming: The most basic streaming example: starts a Kafka server, creates a topic, creates a stream to process that topic, and publishes some data using the SparkKafkaSink.

    For each of the received RDDs, the code prints the number of partitions and the number of elements in each partition. The code exercises no control over the partitioning of the received RDDs, and there turn out to be two partitions each time, compared with four in the originating RDD and four in the topic. By examining the partitioning here, we set the stage for exercising some control over it in later examples.

    Notice there's quite a lot of waiting. It takes some time for streaming to get going, and data published too early tends to be missed by the stream. (No doubt, this is partly because this example uses the simplest method to create the stream, and thus doesn't get an opportunity to set auto.offset.reset to "earliest".) Also, data that is published takes some time to propagate to the stream. This seems inevitable, and is almost guaranteed to be slower in a self-contained example like this.
  • ExceptionPropagation: This example demonstrates that exceptions encountered in stream processing are rethrown from the call to awaitTermination(). The custom exception SomeException is thrown when an RDD is received.
  • MultipleConsumerGroups: This differs in creating two streams based on two different consumer groups, so both streams get a copy of the same data. It's simply a matter of specifying the two names of the two different consumer groups in the two calls to createStream() for the same topic -- no special configuration is needed. The two calls create two instances of ReceiverInputDStream, and then foreachRDD is called on each of those. This is valuable if you want to create more than one processing pipeline on the same data
  • PartitionedStreaming: By calling createDirectStream() instead of createStream(), you can get the generated RDDs to have a number of partitions (in this case 6) dictated by the partitioning of the topic.
  • ControlledPartitioning: Here the topic has six partitions but instead of writing to it using the configured partitioner, we assign all records to the same partition explicitly. Although the generated RDDs still have the same number of partitions as the topic, only one partition has all the data in it. This demonstrates how to exercise control over partitioning all the way from the original RDD, through the topic to the resulting RDDs.

Feedback please!

Personally I've found this project useful in improving my understanding of Kafka itself, as well as the Kafka integration features of Spark Streaming. It's always hard to tell which of these projects are useful to others and why, so I look forward to hearing from you about your experiences with the code.

Sunday, October 9, 2016

Learning Spark with Java

In a recent post I discussed the history and motivation of my LearningSpark project on GitHub. While that project is mostly based on the Scala APIs to Apache Spark, I explained why I had begun to explore the Java APIs as well. I also predicted that I would soon introduce a separate project, based solely on Maven and Java, to continue the Java exploration: most Java programmers are much more comfortable with Maven than with sbt, and a separate project allows me to choose the Java version appropriately.

The new learning-spark-with-java project on GitHub is the result. It started with a copy of the examples on the original project, but since I've now adopted Java 8, I rewrote the examples to make use of the latter's lambda expressions, perhaps ironically making the code now look more like the original Scala code.

I'll proceed with this project using the guidelines I listed in the LearningSpark project when I branched out into Java. I will almost definitely not:

  1. Rush to catch up with the Scala examples,
  2. Keep the two sets of examples perfectly (or even well) matched,
  3. Branch out into Python and R as well (seriously, I have no interest in doing this.)

I'll probably still focus on the Scala examples more, as new features seem to mature a little faster in the Scala API. I am unlikely to add to the Java examples in the LearningSpark project, and if they get in the way or create confusion, I may eventually delete them. As always, feedback is welcome, and I'm especially curious to see whether the community finds this project as useful as some people obviously found the earlier one.

Sunday, August 28, 2016

Taking a Detour with Apache Spark

Almost two years ago, while preparing for a talk I was giving at the now defunct Seattle Eastside Scala Meetup, I started a public GitHub project collecting and organizing Apache Spark code examples in Scala. I had stumbled on a way to run the examples on all supported platforms without setting up or deploying to a cluster, so the overheads of experimenting with the Spark APIs and programming idioms were remarkably low. It seemed like this approach was not well known at the time, so I shared it via the GItHub project and by posting here. Other than avoiding the overheads of a Spark cluster, the main feature of the project has been a "baby steps" approach to the examples. I've tried to demonstrate each API feature with multiple, slightly varying examples and (with notable, unfortunate exceptions) comments, to build intuitions before leaving the readers to take their chances with the Scaladoc.

Two years and about sixty sample programs later, I'm still not sure of the project's role and future, except that it has been tremendously helpful to my learning about Spark and Scala. The Apache Spark project's documentation and examples have improved, as has test coverage -- the latter always being a good way to learn about a new feature, except when there isn't any. The Databricks blog has also made a difference. And yet, the project continues to be useful to me, and I occasionally hear from others who find it helpful, including one local company that uses it in their training program. I like the "baby steps" approach to learning an API, and apparently I'm not the only one.

But lately I've had to ask myself some hard questions about the project. As I hope to post separately about soon, the evolution of Spark SQL's object model (remember SchemaRDD?) has made the task of keeping the project organized rather challenging lately -- I don't like to move examples around so I don't break links from the blog, StackOverflow and elsewhere. Another problem that's been nagging at me lately is my choice of Scala for the examples. I enjoy using Scala, have enjoyed learning it, and the Apache Spark project continues to keep the Scala APIs as a first class citizen. Indeed, Spark is written in Scala, but as I'll discuss later, that's no guarantee of strong support for Scala APIs. I've never been interested in the Python or R APIs, even though I believe they're of tremendous importance to the industry: I'm not part of the target audience (broadly, the data scientist) and I don't enjoy programming in either language. That leaves Java.

Time to explore the Java APIs

Many of you have seen the various Typesafe/Lightbend surveys showing Scala to be more popular than Java for Spark development -- the latest one has it at 76% Scala, 58% Java 8 and 34% Java 7 or lower. Clearly, there is overlap, so it's not clear whether Java or Scala are more popular overall. I see several reasons to explore Spark from the Java perspective:

  • Java is clearly an important part of the Spark ecosystem, as the surveys show.
  • The Java APIs are not merely an afterthought in Spark: real effort seems to have been invested in making Java programming practical and a reasonable approach.
  • While even a quick examination of the Spark project's Java examples (which date back to Java 7) shows them to be verbose and awkward compared with the Scala examples, the introduction of functional programming features in Java 8 raises the possibility of Java catching up.
  • I see a certain hesitation about Scala in the "big data" ecosystem. Lightbend has taken the "we don't have to choose" approach, and seems to be pretty sincere about it -- and of course they should be if they believe their own survey results. Confluent's decision about Apache Kafka is a bit more interesting: Kafka is also written in Scala, but only supports a Java API, with others provided by the community. While Cake Solutions actively develops the scala-kafka-client project, the Scala APIs are definitely not a first class citizen.
  • I've been a Java programmer, on and off, for 18 years. Before Scala, it was my recreational language of choice, and I still like it. I'm curious about Java 8, which I've only used a little, for another recent project.

Together, these certainly don't motivate me to abandon Scala, but they do motivate me to understand the tradeoffs better than I do now. The bottom line is that I've started adding some Java examples to the project, and started marking my commit messages with "[Scala]" or "[Java]" as appropriate.

Important Questions

I'm definitely making this up as I go, so let me expose some of the decisions I'm trying to make.

Which Examples?

I started with Dataset and DataFrame, since I had recently worked on those in Scala. But I'd at least like to get a cross section of the different areas: core RDDs, SQL, streaming and perhaps GraphX. Then I'll probably focus more on the areas that bring out interesting differences, whichever they turn out to be. There's no point exploring Spark SQL as a query language comprehensively in both Java and Scala, so I won't do it in Java.

Which Version(s) of Spark?

This is easy: much of the reason I invest in the project is to keep up with Spark evolution, and it takes a lot of effort. I'll continue adopting each new Spark release as soon as I can, and use its new features.

Java 8 or Earlier?

Java 8 seems to be getting a lot of adoption, and the new features definitely make it better suited to Spark. But the APIs have a number of features that were intended to work around the deficiencies of earlier versions of Java (such as all of, so it seems interesting to explore them for a while. Yet I'll probably change to Java 8 soon to keep the project from becoming a museum.

One or Two Projects on GitHub?

So far I've used the parallel Scala/Java source tree structure of sbt projects to locate the Java code in the same project as the Scala code, but I'm already feeling like this was a bad idea. I think it hinders my ability to serve the community, since Java programmers are much more likely to be familiar with Maven than sbt, and the one Java class I had written to support the Scala code (hiveql.SumLargeSalesUDAF) is now tangled up with the Java examples. I think you can expect to see a separate project soon. (Splitting the projects also allows me to use different Java versions.)

Parallel Organization?

As I mentioned earlier, the evolution of the object model around Spark SQL has made it hard to keep the project organized, and the Scala examples are getting out of hand. I'm not going to inflict this entropy on Java developers, and will try to organize the Java examples according to my current understanding of how Spark fits together. In due course this may help me sort out the organization of the Scala examples too -- in any case I'm hoping to write a separate post on this topic.

How Much Effort?

I don't know how much I'll balance my effort on Scala and Java examples, or even whether I'll keep working on the Java ones for much longer. It depends on feedback, how much insight I get, where the community ends up going, and how Java and Scala (and Spark) evolve.

Abandoning Scala?

I've already made this decision: definitely not. It's now my recreational language of choice, and I think it has a future. At the very least, I plan to keep up my attempts at covering major Spark features in Scala as they evolve.

Feedback please!

While my "baby steps" approach to Spark examples seems to have had some impact on the community, I get very little direct feedback. Occasional questions have inspired some of the examples, which I hope were helpful to those people, and one local professional who reached out and told me how he has found the project valuable has dramatically increased my motivation. I'd be delighted to hear about your experiences, either about the examples themselves, or about Spark in general.