Sunday, October 23, 2016

Developing Utility Bolts for Apache Storm

Apache Storm is a distributed stream processing framework: one of many such frameworks but among the most popular. Storm applications ("topologies") are composed of "spouts" (sources of data) and "bolts" (data transformations), and these are connected by "streams" of "tuples", which are a sequence of typed key/value pairs of data. The spouts and bolts can be thought of as vertices in a directed acyclic graph, and the streams as edges. The spouts are always graph sources, with only outgoing edges, but the bolts may have either both incoming and outgoing edges, or they can be sinks, with only incoming edges.

Storm provides various pre-defined components, most of them spouts, providing standard data sources for streaming data from database systems, file systems, queueing systems and network listeners such as a web server, and so on. Similarly it provides pre-defined bolts, some serving as data sinks along the same lines, as well as interfaces to the usual logging frameworks.

In this post I'm going to examine what it takes to do a good job of adding reusable transformers (in the form of utility bolts) to Storm, for use by topology developers. Storm already provides a number of these, mostly in the package org.apache.storm.starter.bolt, and a few more in the package org.apache.storm.testing. (Storm follows a convenient naming convention where all bolt class names end with "Bolt".) Alas, most of these are completely undocumented, at least in the JavaDoc, but many are quite simple, and their usage can be worked out from a quick read of the source. Standard transformations can provide simple operations like projecting out unwanted fields, or much more complex ones like filtering, aggregation or data smoothing.

Since sometimes spouts and bolts have common or interacting design issues I'll occasionally touch on the design of spouts, but that's a complex topic in itself that is mostly out of scope for this post.

Audience

Since this is intended to be a practical guide for writing reusable bolts, I'm going to assume that you already understand the basic mechanics of writing a very simple bolt and getting it working in a Storm topology. A good source for learning how to do this is the book "Storm Applied: Strategies for real-time event processing" by Sean T. Allen, Matthew Jankowski, and Peter Pathirana. I'm also assuming that you have the most basic familiarity with Storm's Java API.

Open Source Project

You may want to read this in conjunction with the storm-gadgets project on GitHub, which includes a small number of bolts largely developed using the design principles described here, although I'll leave detailed discussion of the actual code to another post.

Design Goals

First I'd like to propose some design goals for creating utility bolts:

Ease of use: The purpose and behavior of the bolt should be clear and it should be easy to set up and include in a topology.

Appropriate Generality: When designing reusable components of any kind there's a tradeoff between ending up with lots of similar components on one hand, and components with complex configuration on the other. When adding components to an existing framework it helps to "blend in" with how existing components have handled this compromise. Another facet of generality is adaptability to as wide a range of topologies as possible, in terms of variations like concurrency support, reliable delivery support, tuple contents, and so on.

Robustness: Good choices need to be made about what kinds of errors are tolerated and which lead to topology execution failure. Here again the pre-existing components can be a guide. Furthermore, in the streaming world it's very expensive to allow bad input data or a localized problem to terminate the application. It's usually best to avoid interrupting stream processing in all but the most severe cases: anything that prevents the successful processing of a large fraction of tuples.

Ease of Diagnosis: It's important to be able to diagnose misconfiguration of these components as well as failures, or other faults, during their execution. Again, existing components can be a guide here, but broadly we want to be able to read the usual logs and see what is happening in each component – easily being able to zoom in on a specific component type and/or component instance. The reader of log messages needs to be able to understand the scope and severity of each reported problem, and ideally what to do about it: fix bad configuration, restart the topology, solve an environmental problem, etc.

Performance and Scalability: In addition to the component itself performing well, it should not detract from the performance and scalability of the topologies that use it any more than necessary.

Implementation Guidelines

To meet the above component design goals in the Apache Storm framework, we need to address certain technical issues. I'll leave performance and scalability to a separate post, and address the functional issues here. As mentioned earlier, this discussion will occasionally refer to the Java API, although that's not the only option for implementing bolts.

Distinguishing between inputs: A component may take inputs from multiple other components, and will often treat those inputs differently -- that is, they have different roles in the operation of the component, and so the topology developer will need to be able to specify which input stream has which role. Furthermore, upstream components may be emitting tuples on multiple streams, and sometimes multiple output streams of a single component may be consumed by our component. In Storm, streams have names local to the component that emits them, and components within a topology live in a flat namespace where they have global names. Storm provides the class org.apache.storm.generated.GlobalStreamId for dealing with this two-level namespace. In short, the component must support dealing unambiguously with the names of streams.

Organizing outputs in a consumable way: Our own component may need to generate multiple output streams, in which case they need to be named. Even if there is only one, there may be reasons not to simply use the default output stream (whose name is, aptly enough, "default".) Sometimes it will make sense to generate the stream names ("out1", "out2", …) but in other cases they will need user configuration to fit them into a topology. The fields in output tuples will also need names, which may be fixed or generated in some cases, and need to be configured in others. This can be a lot for the user to configure, and the decision as to what needs to be configured should be made carefully. Finally, there are cases where it may be tempting to choose the output stream and field names based on the input stream and field names. There are two problems with this. First, while it may seem like a great way to avoid configuration altogether, Storm spouts and bolts are required to declare their streams and fields (and the order of fields) via the declareOuputFields() callback when the topology is initialized. Second, while it is often practical to use the configured names of inputs as names of outputs, you need to watch out for collisions – multiple input components may use the same stream name, and multiple streams may use the same field name. In short, simply passing input names through as output names is not a viable bolt design strategy in Storm.

Interoperating with Guaranteed Delivery: The degree to which a topology achieves guaranteed delivery of tuples depends on its configuration, as well as the behavior of the spouts and bolts. Spouts need to assign IDs to tuples, bolts need to anchor their emitted tuples appropriately with respect to input tuples, and all components need to acknowledge tuples appropriately. Spouts have to implement the ack() and fail() methods, which also impacts the nextTuple() method, as emitted tuples need to be stored, keyed by their tuple ID, until they are either acknowledged (and then deleted) or failed (and then replayed.) Finally, bolts that communicate with external systems such as databases or queueing systems will need to "fail" the tuple when operations on external systems fail, so that it will later be replayed. When developing a utility component, we don't know whether guaranteed delivery will be used in a particular topology -- it usually needs to support either behavior. Fortunately, if we develop the component as if guaranteed delivery will be used, it can also be deployed without it. As we will see below, doing this sometimes it raises complex design issues.

Concurrency: It is straightforward to write components in a way that allows Storm to operate multiple instances in parallel, but problems arise when we use these components in a topology and try to decide on what grouping method to use to connect them. Often a shuffle grouping will work – in particular, if the bolt processes each tuple completely in isolation from others. It gets more complicated if the order of tuples is significant to the bolt, or they need to be grouped in some way – then often a fields grouping is appropriate. This is all in a day's work for Storm topology developers, but it requires understanding the behavior of each spout and bolt. As utility component developers, it's up to us to understand our component's behavior well enough to document the grouping requirements it imposes, and sometimes this can be complex as it may be different for different inputs. Spouts have additional responsibilities with respect to concurrency, as the various spouts reading from an external data source need to divide the data among themselves. When reading from a queue, this is straightforward, but if reading from a DBMS they may have to work out how to explicitly partition a table.

Error handling: The issue of error handling in streaming applications is complex and covering it completely in this post seems impossible. As utility component developers, however, we need to understand and document how our component interacts with system failures in the topology around it, and also what it considers "invalid" configuration and "invalid" input data.

Misconfigurations should usually be reported when a component is initialized (from the constructor) or during the call to prepare(), as they should, if at all possible, be reported before the topology starts to execute and should, in most cases, prevent it from executing. One major kind of misconfiguration that components should always check for during initialization is whether an appropriate set of input streams and output streams have been configured -- there's usually no point starting to execute data if they haven't. This is also a good time to check for groupings that can't be supported, concurrency levels that can't be supported, as well as combinations of grouping and concurrency.

Invalid tuples are a different matter: unclean data is a regular fact of life, and data pipelines should recover and continue executing whenever possible after an invalid tuple is received. This can be either very simple or complex depending on the nature of your component. One thing to remember is that if you effectively drop a tuple for being invalid, you still need to acknowledge it so it doesn't get replayed when guaranteed delivery is being used – this can feel counterintuitive but is very important. There remains the issue of reporting the problem to support diagnosability. It's important to be able to monitor whether the number of tuples (absolute or as a proportion of data processed) each component has rejected is very small or very large. In the latter case, hopefully an administrator should be alerted to check whether there is a major, systematic configuration or data source problem. Sometimes the administrator will have the luxury of stopping the data pipeline, but often this is out of the question. Millions of tuples may be rejected before an upstream problem is solved, and you don't want your alerting mechanism to cause more problems than it solves. For example, logging every rejected tuple can seem like a good idea, and indeed be very useful, until the logs fill up a storage device or the logging slows the topology to a crawl. Logging needs to be used judiciously, and logging the occasional rejected tuple is probably still a good idea. Logging the number of rejected tuples from time to time can also be useful. For some components, particularly those that are "fussy" about their inputs, it may make sense to output something (perhaps a count, or an error message) on a dedicated output stream whenever a tuple is rejected. It may even be tempting to output the entire tuple, but this is not straightforward. Since the field signatures of a component's output streams need to be pre-declared, it's hard to emit an unexpected field. One approach is to serialize the entire rejected tuple into a single field, perhaps called "tuple", perhaps in a serialization format that is both machine and human readable.

Spouts that attempt to support guaranteed delivery also need to handle situations where either tuples are not being acknowledged for a long time (imposing an huge interim storage burden on the spout) or repeatedly being failed (adding a retransmission burden to that storage burden) in both cases suggesting that something is seriously wrong. Such situations can be handled by occasional reaping of old tuples and by imposing limits on the number of retries – both requiring additional information to be stored with the transmitted tuple, as well as judicious decision making by the designer

Logging: Storm now uses SLF4J for logging, and it's straightforward for individual components to use it as well. Any logging done on a per-tuple basis should be at the DEBUG level so it can be disabled in production. Major component lifecycle and configuration events should be logged as INFO as it's cheap to log them and they should always be available.

One aspect of logging to be aware of is that a component can only become aware of its ID in the topology when prepare() is called. If you want to use it for logging elsewhere (and you will) you need to save it at that time. Furthermore, not only is the ID not known in the constructor, but it is also not known in declareOutputFields(), which is called before prepare(). If it seems useful for the association between the component ID and its configuration (and perhaps output fields) to be clear in the logs, you may want to log it all inside prepare() even though it was already available in the constructor and it may have been tempting to log it there.

Interactions with external systems: Spouts often read data from external systems and bolts can read or write data from/to such systems, or both. To do this responsibly, they should not overuse the resources of those systems, including connections. This includes limiting the number of concurrent connections, disconnecting responsibly when cleanup() or deactivate() are called. As mentioned earlier, it needs to be clear what happens when multiple instances of a component read from the same database table – are they replicating the data or partitioning it? An additional complication to keep in mind is that when guaranteed delivery is in play, the input tuple to a component may be replayed -- it's necessary to think through what effect this will have on the external system.

In Practice

You can make up your own mind as to how well the bolts in the project meet the design goals and conform to the implementation guidelines: I'll discuss some of them in detail in future posts. If you have bolts of your own that raise interesting issues, or feedback on the ideas discussed here, please let me know.

No comments:

Post a Comment