Thursday, October 20, 2016

Learning to use Apache Spark and Kafka Together

I recently wrote about the unexpected popularity of the LearningSpark project on GitHub, and speculated that some of the popularity stemmed from the convenience of not having to set up a Spark server -- each example program is more-or-less self-contained. This approach has certain limitations (for example, it's an awful way to explore performance and scalability issues) but it does leave people free to concentrate on model and API issues in isolation. This can be useful not only for Spark beginners, but also, as Spark evolves, it's a good way to understand the new features and the problem solving approaches they support.

Sooner or later, a large fraction of Spark users end up grappling with how to use Spark in conjunction with Apache Kafka. Now, the overheads of setting up both a Spark cluster and a Kafka cluster before you can write the stream processing equivalent of "Hello World" can be quite high, especially if you're still learning BOTH systems. Being in this situation myself, I started to wonder how hard it would be to set up a project where both the Spark system and the Kafka broker (and ZooKeeper instance) were embedded in the example program. Such an approach would have the same limitations as the Spark project described above, and probably suffer from them even more, but conversely, the benefits of such simplification would be even greater. It turns out not to be very hard to achieve, as you can see at another GitHub project, spark-streaming-with-kafka, but it does have certain gotchas, which I'll discuss.

It seems to me that the Kafka project has suffered rather considerable API churn, not just in the details of the APIs but in the fundamental conceptual model as well. Currently in Spark 2.0.0, released in July of 2016, Spark support for the APIs is lagging somewhat, supporting Kafka 0.8.2.1, released in February of 2015. This seventeen month lag causes some minor difficulties, although the situation seems likely to improve in Spark 2.1.

The main impact of this lag in API support is in the area of the kafka-unit project, which provides convenient APIs for managing an embedded Kafka instance. This project is now at version 0.6, but to get a release that supports Kafka 0.8.2.1 we have to go back to kafka-unit 0.2, which is missing some handy newer features like creating partitioned topics. I've had to work around this in one of the utility classes discussed below.

Utility Classes

So far, I have needed the following utility classes to keep the examples sane. Note that these links point to a version of the code specially tagged to provide permanent links: keep this in mind if you want to see more recent versions of the code.

  • EmbeddedKafkaServer: Encapsulate uses of the kafka-unit project to embed a Kafka instance, working around the fact that, as disucssed above, we have to use a rather old version of that project.
  • SimpleKafkaClient: Some trivial default configuration for the producers and consumers used in the examples.
  • SparkKafkaSink: An extension of the code provided in Marcin Kuthan's rather useful blog post about publishing to a Kafka topic from a Spark job.His code uses the default partitioner, essentially broadcasting the contents of each RDD partition to all of the topic partitions, effectively causing a random repartitioning. The additional overload of the send() method here instead allows the topic partition to be specified, so the contents of all RDD partitions are sent to the same topic partition. I'm not sure this is useful in practice, but it helps to illustrate the relationship between RDD partitioning and topic partitioning.

Examples

So far, the following five examples are available. Once again, the links point to a tagged version of the code.

  • SimpleStreaming: The most basic streaming example: starts a Kafka server, creates a topic, creates a stream to process that topic, and publishes some data using the SparkKafkaSink.

    For each of the received RDDs, the code prints the number of partitions and the number of elements in each partition. The code exercises no control over the partitioning of the received RDDs, and there turn out to be two partitions each time, compared with four in the originating RDD and four in the topic. By examining the partitioning here, we set the stage for exercising some control over it in later examples.

    Notice there's quite a lot of waiting. It takes some time for streaming to get going, and data published too early tends to be missed by the stream. (No doubt, this is partly because this example uses the simplest method to create the stream, and thus doesn't get an opportunity to set auto.offset.reset to "earliest".) Also, data that is published takes some time to propagate to the stream. This seems inevitable, and is almost guaranteed to be slower in a self-contained example like this.
  • ExceptionPropagation: This example demonstrates that exceptions encountered in stream processing are rethrown from the call to awaitTermination(). The custom exception SomeException is thrown when an RDD is received.
  • MultipleConsumerGroups: This differs in creating two streams based on two different consumer groups, so both streams get a copy of the same data. It's simply a matter of specifying the two names of the two different consumer groups in the two calls to createStream() for the same topic -- no special configuration is needed. The two calls create two instances of ReceiverInputDStream, and then foreachRDD is called on each of those. This is valuable if you want to create more than one processing pipeline on the same data
  • PartitionedStreaming: By calling createDirectStream() instead of createStream(), you can get the generated RDDs to have a number of partitions (in this case 6) dictated by the partitioning of the topic.
  • ControlledPartitioning: Here the topic has six partitions but instead of writing to it using the configured partitioner, we assign all records to the same partition explicitly. Although the generated RDDs still have the same number of partitions as the topic, only one partition has all the data in it. This demonstrates how to exercise control over partitioning all the way from the original RDD, through the topic to the resulting RDDs.

Feedback please!

Personally I've found this project useful in improving my understanding of Kafka itself, as well as the Kafka integration features of Spark Streaming. It's always hard to tell which of these projects are useful to others and why, so I look forward to hearing from you about your experiences with the code.

1 comment:

  1. ControlledPartitioning: Here the topic has six partitions but instead of writing to it using the configured partitioner, we assign all records to the same partition explicitly. Although the generated RDDs still have the same number of partitions as the topic, only one partition has all the data in it. This demonstrates how to exercise control over partitioning all the way from the original RDD, through the topic to the resulting RDDs. http://www.river-of-bytes.com

    ReplyDelete