Sunday, April 22, 2018

Revisiting Spark External Data Sources (The V2 APIs)

Over three years ago I started writing about the Spark External Data Source API, first with two general posts: External Data Sources in Spark 1.2.0 and Filtering and Projection in Spark SQL External Data Sources.

Subsequent posts dealt with my explorations via integrating MongoDB:

Back then, I made my experiments available as part of a collection of self-contained Spark examples on GitHub, and a number of projects specifically targeted towards MongoDB:

I learned a lot by doing that, and apparently a number of people found the examples useful, but I never managed a systematic exploration of the original API and all of its capabilities -- something that didn't feel right, and that I always meant to go back to when time allowed.

Since then, a number of features have been added to the original API, but more significantly, the community started work on a new (V2) API described in this design document and in SPARK-15689. The new API has now been released with "experimental" status in Spark 2.3.0, and further improvements are pending, as described in SPARK-22386. Even allowing for the inevitable churn that an experimental API of this complexity will inherently experience, this seems like the perfect time to revisit Spark external data sources, and the new API seems like a good way to do so


I've started exploring the new API in a dedicated GitHub project. I knew I wanted to make the examples "self-contained", since people seemed to find that useful in in my earlier projects. But doing so presented a challenge, as the data sources are supposed to be "external." This meant that I had to make a hard choice between three options:

  1. Use a "trivial" data source like the file system. This can actually lead to considerable insight, but it's not really amenable to exploring all aspects of the API.
  2. Embed one or more external systems (presumably enough to cover all the concepts, which was unlikely to be possible with a single system.)
  3. Provide my own [contrived] data source(s) that allowed covering all the concepts.

I chose the third option, developing a rather simple key-value store called ExampleDB, in which I ignored the finer points like performance and persistence, and simply focused on developing a vaguely plausible API that could support the various examples. I'm expanding and refining ExampleDB as I add each example data source, and hoping I can keep it from becoming a monster. I decided to embed ExampleDB in each of the examples rather than starting a separate process like I did with the Kafka broker and Zookeeper in my Spark Streaming with Kafka project. However, to ensure that the data sources aren't "cheating" in the way they use ExampleDB, I only allow the data sources AND the example Spark jobs to call it via a remote API based on gRPC. Thus, the state of the RPC client needs to be managed carefully in the example data sources, making them realistic.

Another major decision point was which language to use for both the data sources and the Spark jobs. (ExampleDB is written in Java, but that's really of no consequence as far as the tutorial value of the project goes.) Java friendliness seems to have been a major design goal of the new API, and indeed the interfaces that define the API are all Java interfaces. What language will developers actually use to develop data sources? I don't really know, but I think it depends somewhat on their role. I suspect that Spark developers who need to roll their own data source will often do so in Scala, in proportion to the general Scala/Java split in the Spark user community. However, I also suspect that database and file system vendors will prefer Java, since Java talent is so much more plentiful in the industry overall. That's a problem for me because both classes of developers are part of my target audience. So far, all the data sources are in Java, but I've provided all the example Spark jobs in both languages since that hasn't been very costly.

All of these decisions are subject to being revisited as I see how much interest the project generates, and what seems to interest the community the most.

Examples in the Spark Source Code

While the V2 API is currently not documented much (except for JavaDoc comments in the interface sources), the Spark test suite does provide good coverage of the features. My examples are informed and inspired by many of the tests, but I've taken somewhat of a hard line on one design point that wouldn't have been important to the test developers. In the tests, you'll often see a single class implement multiple interfaces just for the sake of expediency. Sometimes they mix data that needs to be serialized for communication between the driver and executors with data that doesn't. That makes it hard to understand their communication behavior.

As an example, consider the test data source JavaSimpleDataSourceV2 on branch-2.3. The inner class JavaSimpleDataReaderFactory implements both DataReaderFactory<Row>, which needs to be serializable and DataReader<Row>, which doesn't.

For this reason I've been rather careful to distinguish between objects that stay put and objects that get communicated. (Keep in mind that this is not a criticism of the tests as tests, it's just a warning for anyone trying to use them as a tutorial, and an explanation of why my data sources may sometimes appear pedantic and bloated in comparison.)

Examples available now

The following example data sources are now available. (Note that the links below refer to a tagged version of the repository.)


    An extremely simple data source that supports sequential reads (i.e. on just one executor) from ExampleDB. It only supports reads from a single, pre-defined table with a pre-defined schema. This data source is probably about as simple as one that reads from a remote database can get. As such, it is aimed at the developer who needs to achieve a simple integration quickly and easily.


    Another simple data source that supports sequential reads (i.e. on just one executor) from ExampleDB. It gets a table name from its configuration and infers a schema from that table.


    This introduces parallel reads (i.e. on multiple executors) from ExampleDB. The resulting Dataframe is partitioned appropriately. If the number of partitions is specified in properties, it is used. Otherwise, the table's default partition count (always 4 in ExampleDB) is used.


    This also supports parallel reads (i.e. on multiple executors) from the ExampleDB. The interesting feature of this example is that it supports informing the Spark SQL optimizer whether the table is partitioned in the right way to avoid shuffles in certain queries. One example is grouping queries, where shuffles can be avoided if the table is clustered in such a way that each group (cluster) is fully contained in a single partition. Since ExampleDB only supports clustered indexes on single columns, in practice a shuffle can be avoided if the table is clustered on one of the grouping columns in the query. (In ExampleDB clustered tables, splits always respect clustering.)

Read/write data sources


    This data source adds the ability to write data, and does so in parallel. The various classes for reading are identical to those of ParallelRowDataSource. All four values of SaveMode are supported. Each task writes to its own temporary table, and on global commit all of these temporary tables are copied into the destination table, and deleted, in a single ExampleDB transaction.

Remember this is all experimental

The V2 API is marked experimental, and indeed a quick look at the Spark source code on GitHub reveals that some (apparently minor) refactoring has already happened since the 2.3.0 release, and we should probably assume there will be more before the 2.4.0 release. I'll try to track releases as quickly as I can until the API stabilizes.

I'm experimenting too

Given the challenges this project presents, I may need to change my approach over time, and I hope to get feedback from you about how well the approach is working.