In the previous article I have described the approach that can be used to work with streams when you use Scala.
Let’s suppose that you have faced the task where you already have one or two streams and have to combine them into a completely another stream or connect and enrich initial streams by data from a third stream (in fact quite a common task with trading data for instance).
Of course, you can do it with Scala and Akka easily. However, if you are limited to Java language only (because code should be supported by the others, Java is the only known language, or there are other circumstances) — what should you do in this case?
Reactor is fully non-blocking and provides efficient demand management. It directly interacts with Java’s functional API, CompletableFuture, Stream, and Duration.
It looks like usual Java streams — declarative in its nature:
However, it can work with dynamically supplied data and is more sophisticated. It supports operations not available in Java stream such as size-based buffering, time-based buffering, the merging/splitting of streams, throttling, thread management, exception processing, backpressure etc.
Well-suited for a microservices architecture, Reactor offers backpressure-ready network engines for HTTP (including Websockets), TCP, and UDP. Scaling out is an important tool for overcoming latency and slow microservices.
As it is used as the backbone for SpringFlux, you can use almost all features of Spring in your app, expect to get support from Spring community and answers for questions you can face at the beginning.
Cloud native apps and serverless functions have a better chance at effective scale-out with asynchronous architectures. To assist in asynchronous designs, Reactor offers non-blocking and backpressure-ready network runtimes, including local TCP/HTTP/UDP clients and servers, based on the robust Netty framework.
Reactor operators and schedulers can sustain high throughput rates, on the order of 10’s of millions of messages per second.
Reactor Project uses the event loop for processing its operators and it is a too big topic for this article. I suppose you can easily google it by the phrase “Concurrency in Spring WebFlux” (to my surprise I haven’t found quite a general article about this on medium, but many of them on other resources).
And as it uses the event loop as its execution engine — it’s absolutely block-free by its nature, and it removes limits on the number of concurrent requests or operations — your application becomes more elastic. It means that the reaction of your application on the growth of work won’t be blocking of new requests but slowing of work (each new and existing connection/request will get less CPU-time).
After that introduction, let’s create some basic pipelines to aggregate several streams that supply their data with different rates and create the resulting stream with the aggregated result. Something like this:
Let’s imagine that I wanted to find some correlation between the state of the field (proportion of dead and live cells) from implementation of Conway’s Life from the previous article with some statistics that are taken from 2 separate streams.
As the source for two additional streams, I took books by Leo Tolstoy “War and Peace” in Russian and English translation and started to send some basic statistics from them (the number of vowels, consonants and words that were read). Of course, as the original stream and two new are not related to each other — barely someone will be able to find any correlation between them. But who knows :) However, in this article, the target is to study how it was implemented, so let’s do this meaningless aggregation for the sake of education.
As a resulting stream we have some data like this:
Further, I’ll explain the process of such pipeline composition. To be stick to the point, I’ll omit some non-relevant things (like book reading code, statistics calculation or data unmarshalling).
For this project I use gradle and key dependencies here are:
// Spring WebFlux Starter
implementation "org.springframework.boot:spring-boot-starter-webflux:2.4.1"// Core library for Reactor Project
additional dependencies can be found in the repository of the project.
Let’s start off by having a look at a book’s statistics stream:
It’s a pretty simple implementation that keeps the state of the data generator (
AbstractBookReader) over its work. After the call of method
createFluxReader as a result an instance of the class
Flux<String> that is able to publish from 0 to N (indefinite) number of elements in the stream. In the case of book streams, I’m looping the book, by starting from the beginning after I reach its end. Opposite to the
Flux-type stream is a
Mono-type that can publish nothing or only one element in its stream (0 to 1) after what this stream is considered completed.
The implementation of the WebSocket client that provides data from Conway’s streaming server (
ConwayServerWebSocketClient) is quite tricky (as anything else connected with network communication and reconnection in case of network failures), so I’ll skip most of it (full implementation is presented in the repository of the project).
To understand how it works the most crucial concept should be explained — subscription. It is a kind of trigger for your pipeline. Nothing will go through your pipeline until someone subscribes to your publisher by
subscribe method. (There is, of course, an exception in the form of “hot” streams but it will be shown later).
So, I call method
WebSocketClient#execute and provide a custom implementation of
WebSocketHandler and then start reading by subscription to
Mono#handle (which will complete when the connection will be closed) in line 14. And while a WebSocket connection is active and stream receives data — it resends (inside the implementation of
WebSocketHandler) those data into a
Sinks.Many<String> responseSink. The
responseSink in this case is a FIFO stack with protection from the overflow and at the same time Flux for data it receives. So the WebSocket client simple resends data from one stream to another.
Now let’s have a look at the implementation of statistics collectors from books. They are presented in the picture under numbers 2 and 3:
Even though I merely execute a method and take a resulting object from it — a lot of more interesting things is happening in this single call:
- in line 3 I create a book-reading word publisher (
- in line 4 I instruct to make a pause between each element of word stream publisher;
- in line 5 — subscribe to the publisher, start it in the background in a separate thread and instruct it to call the method
analyseof the object
And all this is done in 3 lines of code!
Finally, as a result, the method returns a generator that takes collected statistics from
WordStatsCollectorSynchronizedDecorator collectorSync object and reset it.
In the end, let’s review the resulting pipeline from all parts:
- in line 6
Flux<String>is taken from Web Socket client;
- in line 7 I instruct the publisher to pack all messages into something like a bunch, that is released in time that was specified with the parameter;
- in line 8 I call the method
processBatchesin which JSON strings are parsed into objects, and with the help of operation
Mono#zipall streams are connected to
Mono<Tuple4<ConwayServerStats, WordStats, WordStats, Long>>that in its turn with the help of
flatMapis transformed into
Flux<Tuple4<….>>down to the stream;
- in line 9
Tuple4<….>is mapped into a final object that will be sent to outside;
- in lines 10–12 the most interesting thing happens. As you know, all data publisher really start their work when they have at least one subscription, but it is only true for so-named “cold” publishers. The combination of calls
Flux#autoConnectturns “cold” publisher into “hot” one — a publisher that doesn’t need any subscriber to start its work. So when you subscribed to such publisher, it seems like you entered an office where everyone had been working for a while :)
The HTTP controller doesn’t look like something exceptional, rather than usual Spring Boor or Spring MVC controller. The only exception is resulting value — it is a Flux of course:
Now let’s run it. Don’t forget to connect it to the base server with source stream with the help of the environment variable
STREAM_SERVER_WS_URL or by fixing the configuration file
If we run 2 terminal windows with the command:
curl localhost:8080/stats we’ll see that our connections contain exactly the same objects (with exactly the identical timestamps) and they appear absolutely simultaneously:
P.S.: In the project repository you can find some other interesting features:
- skipped parts of the code;
- the examples of the usage of some Java 15 features (sealed classes, records);
- CI/CD pipeline that starts separate parallel jobs for its workflow and shares artefacts between them.