Switching between multiple data streams in a single thread

Redowan Delowar February 19, 2023
Source

I was working on a project where I needed to poll multiple data sources and consume the incoming data points in a single thread. In this particular case, the two data streams were coming from two different Redis lists. The correct way to consume them would be to write two separate consumers and spin them up as different processes.

However, in this scenario, I needed a simple way to poll and consume data from one data source, wait for a bit, then poll and consume from another data source, and keep doing this indefinitely. That way I could get away with doing the whole workflow in a single thread without the overhead of managing multiple processes.

Here's what I'm trying to do:

One way is to poll the data sources in two generator functions and yield the result. Then in the consumer, we'll have to alternate between the generators to fetch the next result like this:

Let's make a concrete example out of the pesudocode:

The code above defines two generator functions, stream_even() and stream_odd(), that use the count() function from the itertools module to generate an infinite sequence of even and odd integers respectively.

The consume() function creates a tuple containing the two generator objects, and enters an infinite loop. On each iteration of the loop, it iterates over the tuple using a for loop; effectively alternating between the two streams. In each iteration, it waits for 1 second using the time.sleep() function and then uses the next() function to retrieve the next item from the current stream. If the result is not None, it prints a message to the console indicating which stream it came from and what the value was. Else, it loops back to the beginning of the iteration.

Running the snippet will print the folling output to the console:

The consumer infinite loop can be written in a more concise manner with itertools.cycle. Instead of using the while loop, we can use this function to indefinitely cycle between the elements of an iterable.

Here, the finalized executable script:

Further reading

Discussion in the ATmosphere

Loading comments...