To understand what problem reactive programming solves, let’s first see what exactly is the problem. Buckle up, if you are new, there might be some turbulence in understanding but I will try my best for the safe landing.
Thread Per Request
Now consider the example above in the diagram, there are two planes and only one runaway. So after first plane P1 has landed safely other plane P2 will try for landing. That means plane P2 will be blocked from landing until and unless P1 completes landing and release the runway for P2. That is, single thread is serving request and other requests are in queue and that thread does nothing productive if it’s further doing an IO operation, it’s just waiting. Once it completes its task, it serves other request. That is Sync & Blocking call.
Now let’s understand thread per request model, consider a traditional spring web application with spring mvc deployed on servlet container such as Tomcat.Traditional Thread Per Request Model
Suppose you are selling electronics items on your application which is running on Tomcat(mentioned above). You figured out that on normal days traffic on your application is around 500 requests per second. Assume you have configured Tomcat’s thread pool to 200 and average response time of your app is say 200ms. Our rough calculation says app can handle around 1k request per second which is enough to handle 500 requests per second(our average traffic). But the sky isn’t always blue and your app starts to lose request in festive season as it got overwhelmed by huge user request.
What happened exactly in above scenario? We have a dedicated thread pool to handle the request, where a thread will be assigned to each incoming request and it will manage the lifecycle of that request. So our application can handle only N number of concurrent requests where N is the thread pool size. You might argue why we can’t just spawn more threads and increase concurrency. That sounds correct also, in fact there is a lot of scope of scaling in above example and that’s a separate topic of discussion.
Everything comes with a cost, and that’s the problem in above approach, spawning more threads will lead to higher memory consumption and at some point our application might crash with OutOfMemory.
How? In JDK 8, every time JVM creates a thread, OS allocates some native memory to hold the thread’s stack reserved with some default thread stack size which varies with JVM, OS and environment variables and this creation of thread commits more memory to the process until the thread exits. Thread stacks are fully allocated (i.e. committed, not just reserved) once created. Now if the app spawns lot of threads, it can consume a significant amount of memory which could otherwise be used by app or OS and this could make the system run OutOfMemory. Well obviously there is a scope of optimisation like reducing the stack size by running server with -Xss option, but better solutions do exist.
Now that we know spawning more threads is not a good solution, let’s comeback to our thread per request model. There are other complexities involved other than higher memory consumption, a thread handling a particular request can enter into blocked and waiting state if it’s doing some IO operation (like DB call) which is again a waste. Higher response time, if we have extra layers of network call or IO operation. It might also happen that one of our service A ask for last few months sales data from service B and it turns out be huge and service B returns that huge data to service A, and it might hog service A.
What else can we do to make the above system performant?
Java offers two models of asynchronous programming:
Callbacks: Asynchronous methods that don’t return any value but have callback parameter(lambda or anonymous class) that gets called when the result is available(or when the blocking call completes). Callbacks are hard to compose, read and maintain, and that’s why it is termed as Callback Hell.
Consider an example: showing top 5 favorite items from user on UI, or if suggest if there are no favorites. We can assume three services being involved: one gives favoriteIDs, second favorite details and third gives suggestion.
Callback interface with two async methods, one is invoked on success and another one on error scenario(Line 1). First service invokes its callback with favoriteID list(Line 2), if list is empty we will go to suggestion service(Line 4). Suggestion service gives favoriteID list to second callback(Line 5). Since UI is involved, we need to make sure consuming code runs in UI thread(Line 6). We only need top 5, so we will process only top 5 using java streams and show on UI(Line 7 to 9). In case error occurs we will show as a pop up (Line 14).
Now assume getfavorite service returned a valid list instead of empty list(back to Line 1). We need to go to favorite service to get detailed object(Line 18 to 20). We only need to show top 5, so again same setup with another callback. That’s a lot of code and hard to follow.
Let’s look at the reactor code equivalent to callback example:<a href="https://medium.com/media/0fe2e90555eda47f69982765d2ec71b0/href">https://medium.com/media/0fe2e90555eda47f69982765d2ec71b0/href</a>
I think the code is self-explanatory, and that’s the beauty. Get the favoriteID list, get the details of each ID, if list is empty, give suggestions, take top 5 and publish it on UI thread, either the process will succeed or error will happen, whatever it is corresponding subscriber’s method will be called.
Futures: Async methods that returns Future<T> immediately. The value T will be computed by async process but Future object wraps the access to the value T. Although the value is not immediately available, the object can be polled until it’s available.
Future objects are better than callbacks, and they were further improved in Java 8 by CompletableFuture. But Future also got some problems:
- If you know how Future works, then you know it’s easy to end up with another blocking situation when get() method of Future object is called.
- Doesn’t support lazy computation(hold computation until its value is needed)
- Lack support for multiple values and advanced error handling.
Consider an example, we have a list of tweetIDs, we want to fetch authors of those tweet and number of likes for that particular tweet, combine them pairwise in asynchronous fashion. Here is the code using CompletableFuture<a href="https://medium.com/media/d1f7de230ea9df77023ec7a3ecc9b699/href">https://medium.com/media/d1f7de230ea9df77023ec7a3ecc9b699/href</a>
In line 1, we get the Future of list that has tweetIDs. Then we move on to async processing of that list in line 3, for each item in the list, get the author name of that tweet and number of likes on that, both asynchronously. Combine the result, and now we have the list of futures that represent all combination task (line 11), to execute these tasks , we will have to convert the list to array(line 12). Now we give the array to CompletableFuture.allof(), which outputs the future that completes when all the task in it completes(line 14), allOf() returns CompletableFuture<Void>, so we iterate once over list of futures collecting results using join() which is not blocking as we already have futures which are completed. Once the whole pipeline is triggered, we wait for it to be processed and return the list of results for assertions.
Since reactor has more combination operators, above can be simplified in reactive programming as shown below:<a href="https://medium.com/media/4ea723bad96b908e71db7fc495d5f558/href">https://medium.com/media/4ea723bad96b908e71db7fc495d5f558/href</a>
Got intimidated by Flux and Mono, don’t worry just assume them as an advance version of Future for now, they are basically the two publishers of project reactor, Mono emits 0 or 1 elements, and Flux emits 1 to N elements. I will cover in detail in later post. Let’s understand the code written above.
We start with asynchronously provided sequence of tweetIDs as Flux<String> (line 1). Remaining process remains the same, like fetching author name and number of likes, then combine it asynchronously using zipWith operator. In real scenario like in production we would continue working with Flux asynchronously by further combining it and subscriber to it. Mostly we return the result as Mono, but for testing purpose we have blocked(line 14) so that we return the aggregated list of values once the process finishes.
We saw how Callbacks, Future and CompletableFuture can bring in more asynchronicity but at some point they might get blocked like Future.get() and CompletableFuture<List<Result>> will need to wait for the whole collection to build and readily available(Async and Blocking), also CompletableFuture doesn’t have handle for infinite values.
Flow API was introduced in Java 9, but that’s more of a contract for reactive streams and no implementation is available as part of JRE.
This is where reactive programming comes to the rescue which solves all the problems mentioned above with following approach:
- Instead of using thread per request model , handle traffic with low number of threads i.e. Eventloops.
- Prevent threads from blocking while waiting for IO operations to complete i.e. Asynchronous and Non-Blocking IO (NIO)
- Giving ability to the consumer(here app) to signal the producer(here DB or BE) that the rate of emission is too high i.e. Backpressure support with push-pull hybrid data flow model meaning app can request for data(pull) or work in unbounded mode(publisher pushing data).
Pheww.. that was too much I guess. We have good base now, I hope the landing was successful.
Now that we have our basics clear, I will start with reactive stream specification in next post. Also I will cover concurrency with Eventloop model in greater detail in future post.
Why Reactive? Thread per Request vs. Reactive Programming Model (Eventloop) was originally published in Walmart Global Tech Blog on Medium, where people are continuing the conversation by highlighting and responding to this story.