Tech and Media Labs
This site uses cookies to improve the user experience.


Stream Processing API Designs

Jakob Jenkov
Last update: 2019-11-18

Stream processing has become quite popular as a technique to process streams of incoming events or records as soon as they are received. There are many benefits of, and use cases for, stream processing (aka data streaming), and therefore several stream processing APIs have been developed to help developers process streams more easily.

On the surface these stream processing APIs look similar. However, once you try to implement complex stream processing topologies with these APIs, you realize that the design of these APIs have a quite large impact on just how "easy" processing streams with them becomes. Even subtle differences in design can have a big impact on what you can implement and especially how easy it is to do.

Having burned my own hands on various stream processing APIs, I decided to write this analysis of stream processing API designs to help anyone who is about to venture into stream processing, and who would like a bit of guidance in what to look for when choosing an API.

Stream Processing API Design Aspects

Before I get started on the design analysis I want to list the aspects of stream processing API design that I have come to realize are important. These design aspects are:

  • Topology
  • Feeding
  • Forwarding
  • Triggering
  • State
  • Feedback
  • Concurrency Model

I will explore different design options and their consequences in the following sections.

Topology

One of the more commonly seen design choices for stream processing designs is, to structure the stream processing components as a graph of components working together. Each component gets a record in, processes it, possibly transforms it, and then outputs 0, 1 or more records to the following components in the graph. Here is a simple example illustration of such a stream processing topology:

Stream processing API topology example

The original motivation for this kind of stream processing API design came from APIs designed to process large amounts of objects or records easily, like the Java Stream API. Here is an example explaining the motivation:

Imagine you have millions of Employee objects (or records) in a list, and you need to:

  • Find the Employee objects representing employees that are middle managers.
  • From these objects, calculate total and average salary for this group of employees.

A straightforward way to implement this requirement would be to first iterate the list of Employee objects, and add all Employee objects that represent middle managers to a new list. Afterwards we can iterate this list of middle managers and calculate their total and average salary. This implementation is illustrated here:

Iterating, filtering and calculating on objects stored in a list, the straightforward way.

Notice how you first have to iterate all the Employee objects once, just to find the Employee objects matching the criteria. Second, you have to iterate the list of Employee objects matching the criteria, and perform the calculations. In the end you will have iterated more than the original list of elements (original + filtered list).

Imagine now that you need to perform the same calculations, but this time for Employee objects representing non-managers. Again, first you need to iterate the whole list of Employees to find the matching Employee objects, and when you have found those, iterate that list and perform the calculations.

In a naive implementation of the above 2 requirements you will have iterated the original list 2 times (once to find middle managers and once to find non-managers), and iterated each of the filtered lists once. That's a lot of repeat iteration of what is essentially the same objects!

Instead, you could build a object processing graph, iterate the original list of object once, and pass each object to the graph for processing. Here is how the calculations mentioned in the example above could be structured as a graph:

Processing a list of objects using a graph oriented stream processing API.

Using this design, the objects are only iterated once. Two filters will forward only the relevant records to the total and average salary calculators.

Stream vs. Batch Processing

Using a graph oriented object processing API makes a lot of sense when you have a list of objects you want to process. A list of objects is also referred to as a batch. A graph oriented design means you only have to iterate the records once. Graph oriented APIs are typically also dynamically composable, making it reasonably simple to implement different types of topologies to suit different processing needs.

In true stream processing, however, you only have one record or object at a time. You will never be tempted to implement the iteration in an inefficient manner as illustrated in the example earlier, since there is always only one object to process at a time. No iteration needed. You can still use a graph oriented design even though you only have one record at a time. You will still get the composability advantage.

However, for many smaller processing requirements using a full graph oriented approach can be overkill. Having a single listener that calculates everything you need to calculate, rather than a whole graph with filters etc. can be easier to implement, and can also perform better.

Additionally, if the graph becomes too big, it can become hard to reason about what it is doing, and you might have trouble providing feedback back up the graph, depending on what type of graph oriented design you use (functional vs. observer based).

There is no clear conclusion here. Just keep in mind that a graph oriented design is not always as beneficial in true stream processing as it is in batch processing. Allow yourself to have an open mind about the possible solution, depending on the stream processing requirements you have. The graph based approach might be appropriate, or it might be overkill. However, you can probably still implement the simple approach with a graph oriented API, if you just only create a single listener / processor that does everything.

Since graph oriented stream processing API designs are both common and popular, the rest of this article will deal with various aspects of graph based designs.

Chain vs. Graph

Different graph oriented stream processing API designs are designed around different types of topologies. For instance, the Java Streams API uses a chain topology, like this:

Chain topology.

A chain topology consists of a single chain of processors, each receiving an object or record as input and output an object or record to the next processor in the chain. At the end of the chain you have a single result. The result can be a composite of multiple subresults, but it will be contained inside a single object or record.

Other stream processing APIs allows you to create more advanced graphs of processors. Such a graph can contain multiple results after processing all the records or objects in the stream. Here is an example from earlier in this text:

Graph topology.

Acyclic vs Cyclic Graphs

Several graph oriented stream processing APIs only allow you to create an acyclic graph, meaning records can ultimately only flow in one direction in the graph. Records cannot "cycle" back up the graph and be reprocessed. In many cases, a cyclic graph is not necessary to perform the needed operations, but once in a while a cyclic graph API could be handy.

Feeding

Feeding refers to how a stream processing API is designed to feed data into its processing topology. For instance, some APIs have a concept of a Stream which you can listen to, or attach various kinds of processors to, to build a topology. But you cannot directly feed a record into the source stream. The stream is locked down to get its data from a specific source. E.g. from a list, or a Kafka topic etc. I refer to this as closed feeding design.

Closed Feeding Design.

Other APIs makes it easy to feed data into your topology at every step of the topology. During normal operations this may not be necessary for many stream processing use cases, but it can be very useful for some use cases, and also during testing. Being able to feed data into the topology at every node in the graph makes testing much easier. I refer to this as an open feeding design.

Open Feeding Design.

Forwarding

Forwarding refers to how a stream processing API is designed to forward data in its stream processing topology, from one processor to the next. There are many ways to design forwarding, but typically forwarding designs fall into the following two categories:

  • Static Forwarding
  • Dynamic Forwarding

Static and dynamic forwarding are not necessarily hardly separated categories. You can have designs that are "semi-dynamic" (or "semi-static" depending on your choice of nomenclature). Maybe you can think of these two categories as different ends of a spectrum.

Static forwarding means that a given processor can only forward a message processed, or the result of a message processed, to one (or more) predefined processors. This was made static when the topology was created. Once created the topology is static (or hard to change, at least).

Dynamic forwarding means that a given processor is free to choose what processor (or queue which subsequent processors can listen to) forwards the result to. Or, at least it has a higher degree of freedom to choose, if not 100% free choice is possible.

Triggering

Triggering refers to how processors in stream processing API topology are activated (triggered). Triggering mechanisms tends to fall into one of these categories:

  • Data Triggering
  • Non-data Triggering

Data triggering is the most common triggering mechanism for stream processing APIs. Data triggering means that the processors in the stream processing topology gets triggered by the data that is sent through it. A processor method is called with the data to process, and the data flows through the topology from there.

Non-data triggering means activation of a processor based on a non-data event. For instance, you might want to call a processor every 5 minutes. That would be a time based non-data triggering of that processor.

Non-data triggering is very useful. For instance, imagine a processor that collects incoming data in a buffer, and then write the data in a single batch to disk or database every 60 seconds. If you have a steady flow of data you can trigger the write to disk / database by the incoming data. However, if the flow of data through the topology is not steady, you need to trigger the writing explicitly using a non-data trigger.

Non-data triggering is also very useful for collecting monitoring metrics from your processors. You can collect information about how many messages / records that have passed through each processor during a given time period.

Functional Stream Processing Designs

Functional stream processing is one of the more well-known stream processing API designs. It is, however, not one of the best designs. Functional stream processing comes in two flavours:

  • Functional Batch Processing
  • Functional Stream Processing

Some of the most well-known implementations using the term functional stream processing are actually not stream processing APIs, but batch processing APIs. I will explain the design of the Java Streams API which, despite its name, is an example of a functional batch processing API.

An example of a truly functional stream processing API is the Kafka Streams API which comes with the Kafka platform. I will explain how the Kafka Streams design is different from the design of the Java Stream API, and also explain its limitations as a stream processing API design.

Functional Batch Processing

Functional batch processing is, as I mentioned above, often called "functional stream processing" although as we will soon see, it is not. The Java Streams API is an example of a functional batch processing API. The design rationale behind functional batch processing is this:

Imagine you have a large collection of objects which you need to iterate and perform various calculations on. For instance, imagine you have to calculate the total sum, average, min and max of a list of objects that contain a price (or maybe even just the prices themselves as numbers). Since the operations to be carried out are very light weight, the heaviest part of performing these calculations on a list of objects is the time taken merely to iterate the objects.

Rather than iterating all the objects once for each operation (sum, average, min, max), you want to iterate the objects only once, and perform all the calculations on these objects during this one iteration. Functional batch processing APIs are designed to enable you to do exactly that.

Functional batch processing APIs are designed so that you configure a set of element listeners and then start the iteration of all the elements. During iteration of all the elements each listener is called once for each element in the collection. It is then up to the listener to perform any operations it need on the given element, and keep track of any information needed in that regard (e.g. sum and number of elements to calculate an average value).

The Java Streams API

The Java Streams API is an example of a functional batch processing API. The documentation refers to these batches as "bounded streams" - but a bounded stream is a batch. A bounded stream has a first element and a last element - and is thus a batch.

I won't go into too much detail about the actual methods available in the Java Stream API. I will focus on what the resulting "structure" of your configured stream processing pipeline looks like.

On the surface, when you have configured a Java Streams API processing pipeline, you will have a processing graph that looks similar to this:

Functional batch processing pipeline - at first glance.

Each element in the source collection (batch) is passed through the chain of non-terminal operations to finally be processed by the terminal operation at the end of the chain. Of course, if elements are "filtered out" before they reach the terminal operation, they won't be processed by it.

However, when you study the "structure" a bit closer, you realize that the various non-terminal operations are functions, and that these functions do not directly call the next function in the chain. Instead, something else calls each function, and pass their return value on to the next step in the chain. Here is a more precise illustration of what is going on:

Functional batch processing pipeline - more precise illustration.

All the non-terminal operations are functions (or classes implementing a functional interface) which are called by the internal iterating component(s) of the Java Stream API. When a terminal operation is called, the internal iteration is started, the non-terminal operations are called and their return values end up in the terminal operation.

Functional Batch Processing - Pros

The advantages of functional batch processing is, that these kinds of APIs tend to be easy and quick to use when you need to do some light transformations / calculations on a collection of objects.

Functional Batch Processing - Cons

For anything beyond the simplest transformations / calculations on a collection of objects, the functional batch processing API design starts to break down.

Terminal Operations are for Batch Processing - Not Stream Processing

First of all, terminal operations have no place in stream processing. The terminal operation typically aggregates the result of the various non-terminal transformations applied to each element during iteration. After iteration finishes, the aggregated result is returned. Returning a final result is only possible because your data has a last element. In true stream processing there is no last element! You don't know when you have received the last element. Thus, terminal operations only make sense in batch processing - not in stream processing.

Chain - Not Graph

When you configure an element processing topology with the Java Stream API (functional batch processing), the result is a chain. In other words, the output of one processing function is the input of the next processing function in the chain. At the end of the chain you have the terminal operation which aggregates and produces the final result.

In real-life stream processing you will often need a processing graph, not just a simple chain.

The reason the functional batch processing uses a chain topology is because the Java Streams API is designed around terminal operations. A terminal operation produces a single, final result. A chain topology is the easiest way to produce a single, final result. All elements flow through the chain and end in the terminal operation.

With a graph topology there could be many different "leaf processors" - each of which could hold final results when the iteration ends. With a graph topology the terminal operation would have to collect the final results from all of these leaf processors and aggregate them into a single, final result. Though possible to implement, this would probably be harder to implement, and the resulting API might be a bit less easy to use. But - it would be much more powerful.

Intended to be Stateless

One of the main ideas in functional programming is, that the functional topologies are intended to be stateless functions that get some input and return a single result.

In real life stream processing, however, you will often need state in your processors. For instance, counting the number of elements that has passed through a processor is useful for monitoring the topology. That count is state.

No Feedback Mechanism

Since processors (non-terminal operations) are intended to be stateless functions, there is no way to provide feedback back up the chain. For instance, in a chain that contains these two operations:

  • Convert from object of class A to object of class B
  • Calculate based on numbers in object of class B.

... there is no way for the second processor to tell the first processor that it doesn't need to convert anymore objects from class A to class B. Let's say, that the second operator only needs the first 10 elements to do its job - then all elements will still be converted from class A to class B by the first processor, even if their values will be ignored by the second processor. A waste of CPU resources.

In real life stream processing you can benefit from a feedback mechanism which feed information back up the processing topology. In many cases, such feedback requires stateful processors.

Observer Based Stream Processing

Observer based stream processing API designs has several advantages over functional stream processing designs. I will here explain what they are.

The Store and Process Design Pattern

For many simpler, and even advanced stream processing requirements, both the functional stream processing and observer based stream processing designs are unnecessarily complex, in my opinion. The store and process design pattern offers a simple, yet scalable alternative.

More coming soon...

Jakob Jenkov




Copyright  Jenkov Aps
Close TOC