Spring Cloud Stream: Hermes Binder


Spring Cloud Stream is a interesting initiative for building message driven application in the widely considered Spring ecosystem. I think that the main idea is ease the usage and configuration to the bare minimum compared to more complex solution which the Spring Integration apparently is.

Altogether Spring Cloud Stream introduces the idea of binders, which are responsible for handling the integration between different MOM at the moment having out of the support for:

  • RabbitMQ
  • Kafka
  • Redis
  • GemFire

For additional information I highly recommend going through the Spring Cloud Stream reference guide.

Allegro Hermes is message broker build on top Kafka with REST API allowing to easily be integrated by HTTP based clients. It also has a rich set of features allowing to pass JSON and binary AVRO messages as well as broadcasting the messages or sending them in batches.

In order to be able to consume it through Spring Cloud Stream we need to provide a dedicated binder that will be able to connect the messages to Hermes.

Fortunately there is one here:



Let’s try to use it in practice, starting from sample project. You may want to first go through the Hermes quickstart guide to set up your environment.

Next we will download Spring Initializr template using httpie.

$ http -f POST https://start.spring.io/starter.zip type=gradle-project style=cloud-stream-binder-kafka > demo.zip

$ unzip demo.zip

Afterwards you can import the project using your favorite IDE.

The first is to do is to replace the spring-cloud-starter-stream-kafka with hermes binder:


Let’s start by configuring the Hermes URI for the binder.

          uri: 'http://frontend.hermes.local:8080'

Now we can design our binding and the POJO used for the message.

package io.jmnarloch.stream.hermes;

import java.math.BigDecimal;
import java.util.UUID;

public class PriceChangeEvent {

    private final UUID productId;

    private final BigDecimal oldPrice;

    private final BigDecimal newPrice;

    public PriceChangeEvent(UUID productId, BigDecimal oldPrice, BigDecimal newPrice) {
        this.productId = productId;
        this.oldPrice = oldPrice;
        this.newPrice = newPrice;

    public UUID getProductId() {
        return productId;

    public BigDecimal getOldPrice() {
        return oldPrice;

    public BigDecimal getNewPrice() {
        return newPrice;

And binding for the message channel.

package io.jmnarloch.stream.hermes;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface Events {

    MessageChannel priceChanges();

Through configuration we can specify, the destination topic name and the default content type of the topic.

          destination: 'io.jmnarloch.price.change'
          contentType: 'application/json'

In order to enable Spring Cloud Stream binding we need to annotate our configuration class.

public class EventsConfiguration {

Using the binding is straightforward, a proper proxy is going to be created and can be afterwards injected.

public class EventsProducer {

    private final Events events;

    public EventsProducer(Events events) {
        this.events = events;

    public void publishPriceChange(PriceChangeEvent event) {

        events.priceChanges().send(new GenericMessage<>(event));

Finally, we can publish our message:

eventsProducer.publishPriceChange(new PriceChangeEvent(uuid, oldPrice, newPrice));

At the moment the binder itself is still under development, but yet this presents the workable example.

Publishing AVRO binary messages is almost as simple as the JSON ones and I’m going to cover that in fallowing blog post.

Spring Boot: Hystrix and ThreadLocals


Last time I have described a quite useful, at least from my perspective extensions for RxJava, but overall it defined only a syntactic sugar so that you could easily specify your custom RxJava Scheduler. One of the mentioned applications was very relevant to this blog post and it’s was related to be able to pass around ThreadLocal variables. As in case of RxJava whenever we will spawn a new thread, through subscribing to Scheduler, it’s going to lose any context that was stored within the ThreadLocal variables of the “outer” thread that initiated the task.

The same applies to Hystrix commands.

Initially, the credit for this idea should go to the development team back in my previous company – Allegro Tech, but it so much recurring problem that others has solve it in the past. Yet again I had the need to solve it once again.

Let’s say that I would like to execute the fallowing command, lets put a side for a moment the sense of doing so, only to illustrate the problem:

new HystrixCommand<Object>(commandKey()) {
    protected Object run() throws Exception {
        return RequestContextHolder.currentRequestAttributes().getAttribute("RequestId", SCOPE_REQUEST);

Puff – the data is gone.

Even when run in server container in “context” of request bound thread the above code will end with exception. This happens because Hystrix by default will spawn a new thread for executing the code, a side from the Semaphore mode that can be also used. Hystrix manages it’s own thread pools for the commands which will have no relocation to the any context stored in ThreadLocal of the triggering thread.

Overall ThreadLocal variables might be considered as anti-pattern by some, but it’s really so useful in many practical scenarios that it’s really not such uncommon that quite a few libraries depend on those.

Typically your logging MDC context or in case Spring Framework the security Authentication/Principal or the request/session scoped beans etc. So it quite important for some use cases to be able to correctly pass such information. Imagine a typical use case that you are trying to use OAuth2RestTemplate  in @HystrixCommand annotated method. Sadly this isn’t going to work.

The solution

Fortunately the designers of Hystrix library have anticipated such use case and designed the proper extension points. Basically the idea is to enable to decorate the executing task with your own logic, once it’s going to invoked by the thread.

On top of that I’ve prepared a small Spring Boot integration module:


At this point the implementation is fairly simple and a bit limited. In order to pass the specific thread bounded value you need to provided a your custom implementation of HystrixCallableWrapper.

For instance to “fix” the above snippet we can register as a bean fallowing class:

public class RequestAttributeAwareCallableWrapper implements HystrixCallableWrapper {

    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        return new RequestAttributeAwareCallable<>(callable, RequestContextHolder.currentRequestAttributes());

    private static class RequestAttributeAwareCallable<T> implements Callable<T> {

        private final Callable<T> callable;
        private final RequestAttributes requestAttributes;

        public RequestAttributeAwareCallable(Callable<T> callable, RequestAttributes requestAttributes) {
            this.callable = callable;
            this.requestAttributes = requestAttributes;

        public T call() throws Exception {

            try {
                return callable.call();
            } finally {

Adding Java 8 syntactic sugar

It quite soon struck me that this is rather boilerplate implementation, because for every variable that we would like to share we would have to implement pretty much similar class.

So why not to try a bit different approach, simply create a “template” implementation that could be conveniently filled with specific implementation at the the defined “extension” points. Considerably the Java 8 method reference could be quite useful here, mostly because in typical scenario the operations that would be performed would be rather limited to: retrieving value, setting it and finally clearing any track of it.

public class HystrixCallableWrapperBuilder<T> {

    private final Supplier<T> supplier;

    private Consumer<T> before;

    private Consumer<T> after;

    public HystrixCallableWrapperBuilder(Supplier<T> supplier) {
        this.supplier = supplier;

    public static <T> HystrixCallableWrapperBuilder<T> usingContext(Supplier<T> supplier) {
        return new HystrixCallableWrapperBuilder<>(supplier);

    public HystrixCallableWrapperBuilder<T> beforeCall(Consumer<T> before) {
        this.before = before;
        return this;

    public HystrixCallableWrapperBuilder<T> beforeCallExecute(Runnable before) {
        this.before = ctx -> before.run();
        return this;

    public HystrixCallableWrapperBuilder<T> afterCall(Consumer<T> after) {
        this.after = after;
        return this;

    public HystrixCallableWrapperBuilder<T> afterCallExecute(Runnable after) {
        this.after = ctx -> after.run();
        return this;

    public HystrixCallableWrapper build() {
        return new HystrixCallableWrapper() {
            public <V> Callable<V> wrapCallable(Callable<V> callable) {
                return new AroundHystrixCallableWrapper<V>(callable, supplier.get(), before, after);

    private class AroundHystrixCallableWrapper<V> implements Callable<V> {

        private final Callable<V> callable;

        private final T context;

        private final Consumer<T> before;

        private final Consumer<T> after;

        public AroundHystrixCallableWrapper(Callable<V> callable, T context, Consumer<T> before, Consumer<T> after) {
            this.callable = callable;
            this.context = context;
            this.before = before;
            this.after = after;

        public V call() throws Exception {
            try {
                return callable.call();
            } finally {

        private void before() {
            if (before != null) {

        private void after() {
            if (after != null) {

Above code is not part of the described extension, but you may use it freely as you wish.

Afterwards we may instantiate as many wrappers as we would like to:


As a result we would be able to pass for instance MDC context or Spring Security Authentication or any other data that would be needed.

Separation of concerns

This is clearly a cleaner solution, but still has one fundamental drawback: it requires to specify the logic for every single ThreadLocal variable separately. It would be way more convenient to have only  define the logic of passing the variables across boundaries of threads between the Hystrix or any other library like for instance RxJava. The only catch is that in order to do so those variables would have be first identified and encapsulated in proper abstraction.

I think such idea would be worth implementing, though I don’t have yet a complete solution for it.


Nevertheless I would be interested in developing PoC of such generic solution and as done in the past, once again prepare a pull request for instance to Spring Cloud to provide such end to end functionality.

Spring Boot: RxJava Declarative Schedulers

As a fallow up to the last weeks article: Spring Boot: RxJava there is one additional project:


Setup as with most Spring Boot starters is fairy simple you just drop the dependency to your project classpath and you are all set:


The library brings one functionality, it allows to specify the Scheduler on the RxJava reactive types: rx.Observable and rx.Single in Spring’s declarative manner – through annotations.

The basic use case is to annotate your bean methods with either @SubscribeOnBean or @SubscribeOn annotations.


    public class InvoiceService {

        public Observable<Invoice> getUnprocessedInvoices() {
            return Observable.just(

The motivation here is to ease the integration with Spring Framework and be able to define within the DI container the application level scheduler. Why you want to do that? There are a couple of use cases.

For example you might need to provide a custom scheduler that can be aware of ThreadLocal variables, a typical use case is to pass logging MDC context, so that afterwords the thread running within the RxJava Scheduler can access the same context as the thread that triggered the task, but the applications go beyond that.

Other typical example is for instance customize your scheduler, not relaying on the build in. In order to for instance to limit the thread pool size, considering that the build in schedulers like IO scheduler are unbounded.

In case you want to simply relay on the RxJava predefined schedulers you can still use them with @SubscribeOn annotation.

    public class InvoiceService {

        public Observable<Invoice> getInvoices() {
            return Observable.just(

Spring Boot: RxJava

Back to posting, this will be a bit old since I had been working on this integration back in the February.

Interestingly enough I had already prepared a blog post related to this feature within Spring Cloud that added tight RxJava integration with Spring MVC controllers. Remarkably it turn out that the implementation in one of the previous milestones had a flow within it.

I’ve been interested in trying to find a solution towards the problem, though I was keen to support mostly the widely used REST like approach in which (mostly) the entire payload is being returned upon computation, in contrast to streaming the response over HTTP. This approach has been reflected in this small project:


Which work out very well as a reference project, in which I had opportunity to try out different API implementations. On it’s own you can use this in your own project, since the proposed implementation depends only on the Spring Boot and Spring Framework’s MethodReturnValueHandler so if you simply using Spring Boot without additional features provided through Spring Cloud feel free to test it out.

Later the code of project become a baseline for the implementation proposed to

Spring Cloud approach

The final approach that has been implemented in Spring Cloud is a bit different, first of all the support for rx.Observable has been removed, instead you can use rx.Single in similar manner like DeferedResult, to which the underlying implementation in fact maps the RxJava type. The reference describes this a bit more in detail: http://cloud.spring.io/spring-cloud-static/spring-cloud.html#netflix-rxjava-springmvc

Funny enough at my company one of my colleagues had to later on migrate the code of one the projects from the rx.Observable to rx.Single, which he wasn’t really happy about😉

Spring Boot: Tuning your Undertow application for throughput

It’s been some time since the previous blog post, but finally I though that it’s a good time to make a post about very useful and practical aspect. How to prepare your Spring Boot application for production and how to guarantee that it will be able to handle a couple of millions of views each day.

If you think that you have already made all the needed steps by making your application stateless, scaling it out or running it on the high end machine, think twice because it’s quite likely that there are some bottlenecks inside your application that if not treated with proper attention would most likely degradate the performance and the application overall throughput.

Tuning for latency vs tunning for throughput.

Interesting enough in the past, being aware of the Little’s Law I have thought that tuning your application throughput requires nothing more then reducing your application latency as much as possible. It was just after reading the book Java Performance the after I realized that might not be true in all of the cases.

Generally you can improve the latency first by improving your application algorithmic performance, after that you should take a look on access patterns in your application introducing a caching layer or redesign the way your application is accessing the data can have huge impact on the overall performance. If your application is heavely I/O bound performing operations in the parallel can be a way to improve things a bit.

Also a good idea for improving you application latency is to configure asynchronous logging whether you using Logback or Log4J2, but of them provide proper functionality.

Thread pools


Undertow uses XNIO as the default connector. XNIO has some interesting characteristics apart from the default configuration which by default is I/O threads initialized to the number of your logical threads and the worker thread equal to 8 * CPU cores. So on typical 4 cores Intel CPU with hypert-hreading  you will end up with 8 I/O threads and 64 working threads. Is this enough? Well, it depends. Considerably the Tomcat’s and Jetty defaults are 100 and 1000 threads respectively. If you need to be able to handle more request per second this is the first thing that need to consider to increase.


The Hystrix documentation states that:

Most of the time the default value of 10 threads will be fine (often it could be made smaller).

After working with couple of the projects, I found it hardly to believe that this could be a true statement. The defaults for Hystrix is 10 threads per pool, which quickly might turn out to become a bottleneck. In fact the same documentation also states that in other to establish the correct size of hysterix thread pool you should use the fallowing formula:

requests per second at peak when healthy × 99th percentile latency in seconds + some breathing room

So let’s assume that you have a system that has to handle let’s say 24 000 rps, divided by the number of instances, for instance 8, you can establish the appropriate pool size for single instance. This will vary greatly on the latency of your system.


Memory usage

All of this is not given without a price. Each of the newly allocated threads consumes memory. Through Java you can configure this property through -Xss property with the default for 64 bit VM being 1 MB. So if you let’s say configure your Undertow thread pool with 512 working threads, be ready that your memory consumption (only for allocating the thread stacks) will be increased to that number.

Connection pools


Do you use for instance RestTemplate, or maybe RestEasy JAX-RS client. In fact there is a well known issue reported in RestEasy that uses exactly ONE connection for all of your calls. The good advice is to align that value with the number of working threads of your application server, otherwise when performing the HTTP calls the threads will be waiting for acquiring the underlying HTTP connection from the pool, which will cause unnecessary and probably unintended delay.


The same basic principal applies to any other kind of service that is being communicated over TCP connection. For instance Memcached clients like XMemcache has nice capabilities of using a multiplexed TCP connection with binary protocol on top of it, giving a throughput of roughly 50 requests per connection, though still if you need to be able to handle greater throughput you need to configure your client to maintain a entire pool of connections.

Garbage collection

If you opt for low latency, probably you should consider optimizing the Garbage Collector as the last kind of resort. As much as garbage collection could be optimized through different settings this does not handles the true problem,  if you can address those issue first you should be able to be just find and tune the garbage collector afterwards for the best overall performance.

Final thoughts

Equipped with this practical knowledge how you will be able to tell if your application is your application faces any of those problems, first of all equipped with proper tools. Stress test are one of them, you can either decide to treat the application as a black box and use for instance Gatling to measure the throughput of your application, if you need need more fined grained tools the jmh project that could used for running benchmarks of the individual Java methods. Finally use profiler to understand where your application is spending the most time, is it for instance a RestTemplate call, or maybe your cache access time sky rockets whenever you? A good advice on how to measure the characteristics of the application is to use the doubling approach, run your benchmark with for instance 64 RPS – monitor the results, and repeat the experiment with double the request number. Continue as long as you haven’t reached the desired throughput level.

With all of this being said, the true is that this in fact describes the hard way, there is also a simple and fast path to solve your heavy load problems especially for HTTP:

Use a caching reverse proxy.

Either if it’s Nginx or Varnish, both of them should take the load out of your backing services and if you can decrease your load  you do not need spend so much time on the optimizations.

CompletableFuture cache

This post is going to be more theoretical and rather describe the idea of asynchronous caches or promise caches on the conceptual level.

How to cache promises?

Earlier this year I been working on small service that entire implementation were based on promises – Java 8’s CompletableFuture to be exact. The nice feature that this provides is the possibility to compose multiple asynchronous operations that can be executed in parallel. Though we quite fast found out that even though we have given a very powerful tool we had to give up some others like for instance caching.

One can argue that, there a simple solution for that, implement explicit caching functionality that would require to check if specific value exists in the cache and to simply return it wrapped into a future or otherwise execute the application logic and populate the cache afterwards.

Unfortunately we didn’t find such solution satisfying. I would prefer to have a more subtle solution. I’ve even spent some time on working on PoC of CompletableFuture cache, ending with workable solution (though I was really treating that as a form of exercise and you looking for something, that you would wish to run in the production there is probably better options):


It was shortly after, when I’d discovered that I haven’t been first to came out with such idea and that there are already existing implementation of the caches capable of storing promises.

  • Twitter Util has cache for Twitter’s Futures in Scala
  • Caffeine has AsyncLoadingCache for Java’s 8 CompletableFuture
  • Spray can cache Scala’s Futures
  • RxCache for caching rx.Observables

The whole idea can be generalized and probably named as promise cache or asynchronous cache. If we had to describe what are characteristics of such cache we can mention few:

  • It caches promises rather then plain values
  • It requires to associate a unit of work with the cache value
  • It caches not only the completed tasks results, but also the “running” tasks
  • Gracefully handles the task cancelation

If I had to expand the description we would need to understand that such cache whenever a new entry is being added to it is going to return a promise of the value. So in most cases we are going to provide to it a kind of task to execute in a form of lambda expression or a callable for instance. In the exchange expect that it will return a promise of the result. We can distinguish three different state of the entry in the cache:

  • No entry exist associated with specific key
  • A new entry has been inserted for the key, but is being executed by the thread in the background
  • A entry exist and is a result of the computation wrapped into a promise.

In other words the cache has one in particular interesting characteristics it has to be able to observe the supplied task and “capture” the result of it’s computation in order to store that and return when requested. This has some interesting implications, if we would consider a typical use case for caching, like for instance a database query or long running HTTP request to remote service, the asynchronous cache has one huge advantages, it allows to provide that task for execution once and until it’s being completed every request can observe the promise until it is done. This is going to efficiently using the system resources. In typical scenario for instance when running in web server that gives us a huge advantage over the blocking solution, because we can guarantee that at given time (on a single server) exactly one background thread is executing the given task, but what more important – all of the request accessing the same cache entry can be processed asynchronously and observe the same single task for completion, not blocking the execution.

Let’s take a look at AsyncLoadingCache from Caffeine as an example of API of such cache:

public interface AsyncLoadingCache<K, V> {

  CompletableFuture<V> getIfPresent(@Nonnull Object key);

  CompletableFuture<V> get(@Nonnull K key,
      @Nonnull Function<? super K, ? extends V> mappingFunction);

  CompletableFuture<V> get(@Nonnull K key);

  void put(@Nonnull K key, @Nonnull CompletableFuture<V> valueFuture);


Despite that the API defines well know put/get methods a typical use case of using asynchronous cache would require to call get method with the provided task for execution.

The ability to supply at most one executing task has a huge advantage and can been very useful in situations that for instance one long running task/request could saturate the web server thread pool.  Introducing the async cache could be very helpful in multiple use cases and could be used as pattern in multiple different scenarios, from already mentioned database queries, to even inserting the data and could be easily used to guarantee the idempotency of the HTTP request (at least on the single node). We won’t be duplicating work and executing same task multiple times.

Once the task completes the execution the cache need to intercept such “event” and store the result on successful completion. This can be easily done with CompletableFuture#completedFuture method. In case of an error the entry will have to evicted from the cache.

The problem of global task cancelation

There on interesting edge case, what if promise that has been supplied to the cache hasn’t yet completed processing and one of the clients will request it’s cancellation? Unless this is handled by the cache implementation this might be a very destructive operation, since any other client waiting for the same computation result will be affected by this operation. Unfortunately in case off Java CompletableFuture, this is an existing problem. The JDK 9 will introduce a CompletableFuture#copy method that will give a way to workaround that, but until that time the implementation like Caffeine does not handle such situations gracefully, CompletableFuture does not expose the proper API for such cases.

Beyond single instance

This is going to be pure speculation from my side, but I can imagine moving this idea way beyond caching within single program instance. It would be interesting to see a distributive system build on top of the concept of asynchronous cache in which (with configurable consistency level) it could be possible to guarantee that in the whole server cluster at most one task is being executed for the specific input value. While any other node could observe the task for completion in non blocking manner. This would be surely a idea worth implementing.

Spring Framework 4.3: AsyncRestTemplate interceptors

After a short break I would like to get back with very interesting topic. Not often you have ability to describe one of the upcoming features of widely used libraries like Spring. Last year I’ve co-authored really simple feature that adds to the Spring’s AsyncRestTemplates a very needed extension point: interceptors. So I would like to take here the liberty to describe them more deeply.

This topic might not be so much useful for most variety of use cases of the AsyncRestTemplate, unless you are developing yourself frameworks or libraries and  you are looking for seamless integration. The contract of the interceptor fallows as much as possible it’s RestTemplate’s counterpart.

public interface AsyncClientHttpRequestInterceptor {

    ListenableFuture intercept(HttpRequest request, byte[] body, AsyncClientHttpRequestExecution execution) throws IOException;

The major difference is that instead of returning the response object the interceptor has to work on a ListenableFuture – a observable promise that eventually will return a HTTP response.

The minimal implementation to intercept the response through the interceptor requires to add callback of the ListenableFuture. Example:

public class AsyncRequestInterceptor implements AsyncClientHttpRequestInterceptor {

   public ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body,
         AsyncClientHttpRequestExecution execution) throws IOException {

      ListenableFuture<ClientHttpResponse> future = execution.executeAsync(request, body);
            resp -> {
               // do something on success
            ex -> {
               // process error
      return future;

Why does introducing the interceptors is important or anyhow useful? If we would take a look of the existing functionality of RestTemplate that is provided through Spring Cloud Netflix, Spring Cloud Commons, Spring Cloud Security or Spring Cloud Sleuth we can list a bunch of interesting applications:

  • Ribbon client load balancing – this is in fact done through ClientHttpRequestFactory, though ClientHttpRequestInterceptor would be sufficient to achieve the same result.
  • Spring Cloud Security – uses them to add load balancing to the OAuth2RestTemplate.
  • Spring Cloud Sleuth – uses them to add tracing header to the outgoing request.

Some other example use cases:

  • Request/response logging
  • Retrying the requests with configurable back off strategy
  • Altering the request url address

You may expect this functionality available with the release of Spring Framework 4.3 and Spring Boot 1.4. Since open source projects have some inertia in development, any integration build on top of it for instance in Spring Cloud probably won’t be available until the 1.2 release.