CompletableFuture cache

This post is going to be more theoretical and rather describe the idea of asynchronous caches or promise caches on the conceptual level.

How to cache promises?

Earlier this year I been working on small service that entire implementation were based on promises – Java 8’s CompletableFuture to be exact. The nice feature that this provides is the possibility to compose multiple asynchronous operations that can be executed in parallel. Though we quite fast found out that even though we have given a very powerful tool we had to give up some others like for instance caching.

One can argue that, there a simple solution for that, implement explicit caching functionality that would require to check if specific value exists in the cache and to simply return it wrapped into a future or otherwise execute the application logic and populate the cache afterwards.

Unfortunately we didn’t find such solution satisfying. I would prefer to have a more subtle solution. I’ve even spent some time on working on PoC of CompletableFuture cache, ending with workable solution (though I was really treating that as a form of exercise and you looking for something, that you would wish to run in the production there is probably better options):

https://github.com/jmnarloch/completable-future-cache

It was shortly after, when I’d discovered that I haven’t been first to came out with such idea and that there are already existing implementation of the caches capable of storing promises.

  • Twitter Util has cache for Twitter’s Futures in Scala
  • Caffeine has AsyncLoadingCache for Java’s 8 CompletableFuture
  • Spray can cache Scala’s Futures
  • RxCache for caching rx.Observables

The whole idea can be generalized and probably named as promise cache or asynchronous cache. If we had to describe what are characteristics of such cache we can mention few:

  • It caches promises rather then plain values
  • It requires to associate a unit of work with the cache value
  • It caches not only the completed tasks results, but also the “running” tasks
  • Gracefully handles the task cancelation

If I had to expand the description we would need to understand that such cache whenever a new entry is being added to it is going to return a promise of the value. So in most cases we are going to provide to it a kind of task to execute in a form of lambda expression or a callable for instance. In the exchange expect that it will return a promise of the result. We can distinguish three different state of the entry in the cache:

  • No entry exist associated with specific key
  • A new entry has been inserted for the key, but is being executed by the thread in the background
  • A entry exist and is a result of the computation wrapped into a promise.

In other words the cache has one in particular interesting characteristics it has to be able to observe the supplied task and “capture” the result of it’s computation in order to store that and return when requested. This has some interesting implications, if we would consider a typical use case for caching, like for instance a database query or long running HTTP request to remote service, the asynchronous cache has one huge advantages, it allows to provide that task for execution once and until it’s being completed every request can observe the promise until it is done. This is going to efficiently using the system resources. In typical scenario for instance when running in web server that gives us a huge advantage over the blocking solution, because we can guarantee that at given time (on a single server) exactly one background thread is executing the given task, but what more important – all of the request accessing the same cache entry can be processed asynchronously and observe the same single task for completion, not blocking the execution.

Let’s take a look at AsyncLoadingCache from Caffeine as an example of API of such cache:

public interface AsyncLoadingCache<K, V> {

  CompletableFuture<V> getIfPresent(@Nonnull Object key);

  CompletableFuture<V> get(@Nonnull K key,
      @Nonnull Function<? super K, ? extends V> mappingFunction);

  CompletableFuture<V> get(@Nonnull K key);

  void put(@Nonnull K key, @Nonnull CompletableFuture<V> valueFuture);

  ...
}

Despite that the API defines well know put/get methods a typical use case of using asynchronous cache would require to call get method with the provided task for execution.

The ability to supply at most one executing task has a huge advantage and can been very useful in situations that for instance one long running task/request could saturate the web server thread pool.  Introducing the async cache could be very helpful in multiple use cases and could be used as pattern in multiple different scenarios, from already mentioned database queries, to even inserting the data and could be easily used to guarantee the idempotency of the HTTP request (at least on the single node). We won’t be duplicating work and executing same task multiple times.

Once the task completes the execution the cache need to intercept such “event” and store the result on successful completion. This can be easily done with CompletableFuture#completedFuture method. In case of an error the entry will have to evicted from the cache.

The problem of global task cancelation

There on interesting edge case, what if promise that has been supplied to the cache hasn’t yet completed processing and one of the clients will request it’s cancellation? Unless this is handled by the cache implementation this might be a very destructive operation, since any other client waiting for the same computation result will be affected by this operation. Unfortunately in case off Java CompletableFuture, this is an existing problem. The JDK 9 will introduce a CompletableFuture#copy method that will give a way to workaround that, but until that time the implementation like Caffeine does not handle such situations gracefully, CompletableFuture does not expose the proper API for such cases.

Beyond single instance

This is going to be pure speculation from my side, but I can imagine moving this idea way beyond caching within single program instance. It would be interesting to see a distributive system build on top of the concept of asynchronous cache in which (with configurable consistency level) it could be possible to guarantee that in the whole server cluster at most one task is being executed for the specific input value. While any other node could observe the task for completion in non blocking manner. This would be surely a idea worth implementing.

Spring Framework 4.3: AsyncRestTemplate interceptors

After a short break I would like to get back with very interesting topic. Not often you have ability to describe one of the upcoming features of widely used libraries like Spring. Last year I’ve co-authored really simple feature that adds to the Spring’s AsyncRestTemplates a very needed extension point: interceptors. So I would like to take here the liberty to describe them more deeply.

This topic might not be so much useful for most variety of use cases of the AsyncRestTemplate, unless you are developing yourself frameworks or libraries and  you are looking for seamless integration. The contract of the interceptor fallows as much as possible it’s RestTemplate’s counterpart.

public interface AsyncClientHttpRequestInterceptor {

    ListenableFuture intercept(HttpRequest request, byte[] body, AsyncClientHttpRequestExecution execution) throws IOException;
}

The major difference is that instead of returning the response object the interceptor has to work on a ListenableFuture – a observable promise that eventually will return a HTTP response.

The minimal implementation to intercept the response through the interceptor requires to add callback of the ListenableFuture. Example:

public class AsyncRequestInterceptor implements AsyncClientHttpRequestInterceptor {

   @Override
   public ListenableFuture<ClientHttpResponse> intercept(HttpRequest request, byte[] body,
         AsyncClientHttpRequestExecution execution) throws IOException {

      ListenableFuture<ClientHttpResponse> future = execution.executeAsync(request, body);
      future.addCallback(
            resp -> {
               // do something on success
            },
            ex -> {
               // process error
            });
      return future;
   }
}

Why does introducing the interceptors is important or anyhow useful? If we would take a look of the existing functionality of RestTemplate that is provided through Spring Cloud Netflix, Spring Cloud Commons, Spring Cloud Security or Spring Cloud Sleuth we can list a bunch of interesting applications:

  • Ribbon client load balancing – this is in fact done through ClientHttpRequestFactory, though ClientHttpRequestInterceptor would be sufficient to achieve the same result.
  • Spring Cloud Security – uses them to add load balancing to the OAuth2RestTemplate.
  • Spring Cloud Sleuth – uses them to add tracing header to the outgoing request.

Some other example use cases:

  • Request/response logging
  • Retrying the requests with configurable back off strategy
  • Altering the request url address

You may expect this functionality available with the release of Spring Framework 4.3 and Spring Boot 1.4. Since open source projects have some inertia in development, any integration build on top of it for instance in Spring Cloud probably won’t be available until the 1.2 release.

Spring Cloud: Zuul Trie matcher

Goal

This time I would like to focus on the Spring Cloud Zuul proxy internals in terms of functionality and performance. If we would take a look at the ProxyRouteLocator#getMatchingRoute, this is the method used for finding route per each Zuul proxy call, it has two characteristics, first of all it iterates over list of all services to find the first one matching, second of all it stops when it find the first path that matches the request URI.

If we would attempt to determine the method running time and denote N – as the number of routes and M – as the maximum length of the configured URI path we can deduct that the running time of getMatchingRoute is O(N M). Now that is not bad. But we can do better. There is a very good know data structure that can help us with this goal, a Trie tree. Though a Trie can be represented in multiple ways, we are going to talk here mostly about the simple R-way tree structure that allows to associate a string key with specific value, similary as hashtable does. To simplify a Trie node contains the optional node value and list of links to the children nodes. The actual link to children is being associated with single character, this is why we end up with R-way structure. The down side is that we need extra memory and depending on how we are going to implement the data structure we may need a lot of  that. Simplest approach, is to use an array of object references, with every entry corresponding to a single character, this also resembles the direct access approach in implementing a hashtable.  Let’s consider what would happen if we would want to use alphabet of 256 ASCII codes – we would end up with in 64 bit JVM with 8 bytes per reference and overall 2 KB per node. For unicode this works worse then that with 65536 unique values we would need 512 KB per node.

The direct access technique would be great if it wouldn’t require such large amounts of memory. We could notice that in this specific case we don’t need to handle so many values since in case of URI the RFC3986 clearly states which characters are allowed. Though we can still do better then that. Instead of using a array to store the links why not to use a JDK Map? The Java’s HashMap is hashtable that uses separate chaining for conflict resolution and has default initial capacity of 16 (though this is implementation detail). That would save us a lot of memory for sparse nodes – those with few number of children.

We don’t have to stop there yet. There is one problem with general purpose Map, if we need to store the primitive char type as the map keys – our only option is the wrapper Character class. This will yield at least 4 times more memory only for storing the reference to the Character objects on heap then a actual primitive char array. Fortunately there is 3-rd party implementation of primitive type collections: Trove. The project might be a bit stale, with over 3 years since the last release, but that doesn’t really mather if we can use the existing functionality. The library defines TCharObjectHashMap that in contrary to JDK HashMap implements the open addressing approach and probing for finding the place for the entries. Also on the web you can find interesting comparision of different collection classes.

Implementation

Considering the above introduction I’ve put that into practice and prepared an actual implementation:

https://github.com/jmnarloch/zuul-trie-matcher-spring-cloud-starter

With fallowing Trie implementations:

  • CharArrayTrie
  • HashMapTrie
  • CharHashMapTrie

So what are the gains due to replacement of the standard Spring Clouds Zuul route matching implementation? Our worst case running time is now O(M), where we have defined M – as the maximum length of the URI path, which means that we aren’t bound at all to the configured number of routes, the algorithm will perform the same fast in every case no matter whether you have tens or hundreds routes defined behind your edge gateway.

This is one of the best examples of time vs memory tradeoff I can think off.

Other considerations

The side effect of using the Trie tree is also that now we can find the best matching paths. For instance let consider two examples:

  • /products/**
  • /products/smartphones/**

Now we have a common prefix for both routes that is  the /products/ prefix. The standard implementation for route /products/smartphones/iphone could match either /products/** or /products/smartphones/** depending on the order in the property file, the Trie tree will match always the route that best matches your request URI.

Plans

At this point in time the extension is kept separately and alongside of the previous Cassandra store and Zuul route healthcheck modules that I’ve done and it does not really go along with them. In meaning that there is no way to have both Cassandra with Trie matcher enabled at this time. My goal would be to at least introduce the basic abstraction through pull requests in Spring Cloud Netflix that would allow to externalize and configure parts of Zuul integration and afterwards release the updated version of the modules.

Spring Cloud: Zuul Cassandra route storage

This time I would like to introduce a very small, but I think useful extension that overrides the standard Spring Clouds property based Zuul route configuration that most people are probably familiar with. Normally it probably looks more or less as fallows:

zuul:
  ignoredServices: '*'
  routes:
    resource:
      path: /api/**
      serviceId: rest-service
    oauth2:
      path: /uaa/**
      serviceId: oauth2-service
      stripPrefix: false

This is perfectly fine, if you have fairly small project with reasonable amount of services. Though, the problem arises when your organization defines numerous different services. Having all of them defined in a single text file does not seem to be a reasonable idea.

Instead you could read that from a external database, RDBMS probably would do the job, but a NoSQL database like Cassandra with configurable replication rate would fit here perfectly. Even the Cassandra eventual consistency model won’t be an issue in this particular case, since the data is loaded on application startup and refresh and cached afterwards. Meaning that every request made through Zuul proxy does not require a database query to be performed each time.

The extension tries to plug in seamless to the existing standard Zuul setup. In order to get it started you will need to add the module to your project dependencies:

<dependency>
  <groupId>io.jmnarloch</groupId>
  <artifactId>zuul-route-cassandra-spring-cloud-starter</artifactId>
  <version>1.0.0</version>
</dependency>

Enable the altered Zuul proxy setup, and also register a CassandraTemplate in your application context:

@EnableZuulProxyStore
@SpringBootApplication
public static class Application {

    @Bean
    public Cluster cluster() {
        return Cluster.builder()
                .addContactPoints(InetAddress.getLoopbackAddress())
                .withPort(9142)
                .build();
    }

    @Bean
    public CassandraOperations cassandraTemplate(Cluster cluster) {
        return new CassandraTemplate(cluster.connect("zuul"));
    }
}

Configure the routes to be loaded from Cassandra, by adding to your application.yml:

zuul:
  store:
    cassandra:
      enabled: true

Finally connect to Cassandra and create a keyspace.

CREATE KEYSPACE IF NOT EXISTS zuul WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };

USE zuul;

CREATE TABLE zuul_routes (
    id text,
    path text,
    service_id text,
    url text,
    strip_prefix boolean,
    retryable boolean,
    PRIMARY KEY(id)
);

You can notice that the schema fallows the basic structure of the properties equivalent. Finally you can populate the table with routes definition.

The source code is available at Github:

https://github.com/jmnarloch/zuul-route-cassandra-spring-cloud-starter

I am planning to do a fallow up to this blog post, although addressing a bit different issue related to Zuul path matching.

Spring Cloud: Ribbon dynamic routing

The problem

I like to focus on solving actual problems that I’ve faced during the project development by creating a generic solutions that would grant code reusability. I think that I have another good use case for defining such abstraction – Ribbon dynamic routing.

Ribbon and it’s Spring Cloud abstraction it’s a very convenient client load balancer implementation that through set of Spring Cloud integration points it can be seamlessly used with Spring’s RestTemplate, Feign clients or Zuul proxy.

In general dependeing on the use case and configuration you can configure it to use with completely fixed list of servers or discover the services at runtime from the discovery service like Netflix Eureka in both cases Ribbon will send the request in round robin fashion to the registered servers. That’s definitely covers most of common integration schemes between services, though what if you want to decide at runtime which services you would like to call?

I would like describe here a very particular scenario, let’s consider that you have been developing your system over time and decided to release completely new version of your publicly available REST API, that itself has been versioned. The actual scheme used here is not important the only consideration is when you have decided to fork you codebase to develop and deploy service as completely new standalone “entity” that will be working simultaneously to the any previous available and maintained versions. Now you face the problem of routing your request towards the services, the easiest and trivial approach is to use proper naming convention of your services when registering them in the discovery, like for instance add the version suffix. At this point you end up with names like recommendation-service-v1.0 or recommendation-service-v1.1. This would be fine, except you are bounding this extra information with your service discovery name. The other consideration is when you need more then one attribute based on which you want to route the user request. Fortunately the authors of Eureka has forseign such case and they allow to store with each individual instance a metadata in a form of key-value pairs. At this point we probably already understand what are we going to do next.

The use cases span beyond simple service versioning, since you have much flexibility you could for instance handle A/B testing based on the discovery metadata.

I want to cover a bit in depth how you can use this extra information through Spring Cloud working on top of the Netflix OSS project and how by combining together Ribbon and Eureka you could route for instance a HTTP request using RestTemplate to very specific backend service. In fact the implementation is going to be trivial, since all of the required functionality is there. We will simply query the discovery registry for metadata map and implement a custom Ribbon’s IRule to match the servers subset.

The Solution

The base interface that we need to implement and simply register as Spring bean is the IRule:

public interface IRule {

    public Server choose(Object key);
    
    public void setLoadBalancer(ILoadBalancer lb);
    
    public ILoadBalancer getLoadBalancer();    
}

In fact there is a convenient abstract PredicateBasedRule class that gives out of the box a set up useful functionality.

When the Eureka discovery is enabled the server list will be in fact instances of DiscoveryEnabledServer that gives straight forward access to InstanceInfo class that contains the described metadata map populated from Eureka. The only missing part is to define a “registry” which is going to be populated with required criteria.

So without further ado there is Spring Cloud starter that defines the needed glue code:

https://github.com/jmnarloch/ribbon-discovery-filter-spring-cloud-starter

You can simply import it into your Gradle

compile 'io.jmnarloch:ribbon-discovery-filter-spring-cloud-starter:2.0.0'

or Maven project:


<dependency>
	<groupId>io.jmnarloch</groupId>
	<artifactId>ribbon-discovery-filter-spring-cloud-starter</artifactId>
	<version>2.0.0</version>
</dependency>

It provides a simple and lightweight layer on top of the described API with default implementation matching the defined properties against the Eureka’s metadata map. From this point it is simple to implement RestTemplate or Feign interceptor or for instance custom Zuul filter that would populate this information.

The Examples

To use the provided API you need to simply  populate the request attributes and then call the desired service.


RibbonFilterContextHolder.getCurrentContext().add("version", "1.0").add("variant", "A");

ResponseEntity<String> response = restOperations.getForEntity("http://recomendation-service/recomendations", String.class);

The defined entries are going to be matched againts the metadata and the server list will be filtered with instances matching exactly the specific values.

Probably it would be more reasonable to implement the attribute population in some generic manner through RestTemplate or Feign request interceptors.

Go Continuous Delivery: SBT plugin 1.0.0 released

I’ve release a first version of Go Continuous Delivery SBT plugin:

https://github.com/jmnarloch/gocd-sbt-plugin/releases/

To use the SBT plugin, just download the release and copy it into:

$GO_SERVER_HOME/plugins/external

After restaring the server you should be able to see the SBT plugin on the Plugin list and also be able to add the SBT task to your pipeline stages.

gocd_sbt_plugin

Reguirements:
Go Continuous Delivery 14.4+
Java 7+

Spring Cloud: Sock.js + STOMP + Zuul = No WebSockets

This time I wanted to share my experience when I worked on setting up Zuul proxy in front of Websocket service.

Let’s start by stating that Zuul does not support WebSocket protocol: https://github.com/spring-cloud/spring-cloud-netflix/issues/163. So probably this post should have ended here. Despite that, I wanted to see what I will be able to achieve having already all the setup in place, with Zuul reverse proxies in front and WebSocket enabled service in back.

Sock.js

Our application fronted, a single web page, was using Sock.js. The library that has one crucial functionality, especially useful in this particular case, it implements multiple fallbacks protocols when WebSocket protocol is not supported. Initially it allowed to same API to work with different browsers, hiding all communication details. We facilitate this functionality as a workaround to communicate with our backing server through Zuul.

STOMP

STOMP is messaging protocol that can be used on top of WebSocket (or HTTP) to communicate with more message orientated manner. Both on the client side and on the server side this is going to hide the details about routing the messages.

Spring Messaging

Spring Framework for a quite some time has been supporting both Sock.js and STOMP so we are going to use that.

Spring Integration

My goal was to connect our system internals, with the application UI. The backend already was exposing REST API. So I wanted to introduce a lightweight “WebSocket enabled” “fronted” that could be scaled separately and could simply forward the messages from fronted. We didn’t want to split the application logic across different modules, simply because they support different communication protocols, instead we have connected the web socket module with the core services through messaging system.

Spring Integration does integrate on top of Spring WebSocket support, although there are some rough edges. I wasn’t able to accomplish my goal as easly as advertised in reference: http://docs.spring.io/spring-integration/docs/4.2.1.RELEASE/reference/html/web-sockets.html and I’ve ended with construct quite similar to the below code:

@Component
@MessagingGateway
public class NotificationEndpoint {

    @Resource(name = "inboundNotification")
    private DirectChannel notificationChannel;

    @MessageMapping("/notification")
    public void notification(String payload, Principal principal) {
        
        notificationChannel.send(new GenericMessage<>(payload, createHeaders(principal)));
    }
}

AMQP

Eventually we were using RabbitMQ in two bit different roles, first it was used as STOMP messaging relay. Allowing to scale out the WebSocket nodes. The second usage was to pass around the messages through AMQP, so that notification from backend could be forwarded through Sock.js and STOMP to UI.

Spring OAuth

We had been using OAuth2 for authentication and for that we needed to propagate the user authorization token through Sock.js to be able to authenticate. Due to the issue: https://github.com/spring-projects/spring-security-oauth/issues/478 you will have to use version 2.0.8 or otherwise you will be receiving 401 – Unauthorized status.

Zuul

The Zuul setup initially was pretty standard, required only to configure proper route to discovered service.

zuul:
  ignoredServices: '*'
  routes:
    resource:
      path: /ws/**
      serviceId: ws-frontend-service

Unfortunately the first tests that we performed, revealed that the client can not maintain the connection, which were continuously closed. Fortunately Sock.js was design in a way that it requires the server to send hearth beats. The solution was to properly set the Ribbon/Hystrix timeouts as described here. Eventually we had set the timeouts to the double of Sock.js heartbeat delays:

hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds: 50000
ribbon:
  ConnectTimeout: 3000
  ReadTimeout: 50000

Summary

Overall I could be satisfied with such solution, though it’s not perfect and long polling will be resource consuming (keeping open the connection on the proxy and occupying the application server thread – the web socket connections generally are handled by separate server thread pools). The nice side effect of above stack is that it should allow transparently move to different transport protocol, Sock.js and STOMP nicely hide all the communication details. But if you need real web socket support you will have to resign from Zuul. Hopefully with release of Zuul 2 this matter will change.

Spring Cloud 1.1: RxJava

The next big thing that is going to happen in Java world in upcoming future is standardization of the reactive programming API in JVM. Java 9 is going to most likely be bundled with Reactive Streams API. Also some existing libraries like RxJava 2.x are moving towards the same standardized API. From what you could learn from last SpringOne is that Spring 5 have plans for seamless integration with reactive paradigm.

Question is do you really need to wait to easily facilitate all of this goodies? The answer is not necessary, especially that you can already use libraries like RxJava.

Spring MVC integration

One nice kind of integration that you can get out of the box thanks to upcoming Spring Cloud Netflix 1.1.x release is support for Observable return types in your controller methods, which makes it convenient if you return Observable calls to remote service and perform some aggregation on the results.

With Spring Cloud Netflix 1.1.x on classpath you can transform your Spring MVC REST controllers into fallowing:

@RestController
public class ProductController {
    
    private final ProductService productService;

    public ProductController(ProductService productService) {
        this.productService = productService;
    }

    @ResponseStatus(OK)
    @RequestMapping(method = GET)
    public Observable<Product> getProductList() {

        return productService.getProducts();
    }
}

You still need to configure TaskExecutor to be able to use this functionality as you would with standard DefferedResult, ListenableFuture or Callable return types.

Hystrix and Javanica

Hystric commands are capable of handling Observables return types as well.

Future possible integrations

I’ve happen to find this interesting pull request that brings the RxJava integration directly to Spring Data commons module, sadly it hasn’t been merged yet. Personally I think that it would be really convenient to call multiple queries at the same time through such API. Maybe this issue simply needs some extra support from community?

If you would be more interested in this topic I highly advice visiting the RxJava project page and some articles on the web.

Spring Cloud: Zuul routes health

When I had described the Go CD Health check plugin I’ve briefly talk about it’s use cases and that it can be set up for automatic delay of acceptance or stress tests. It’s time to describe a bit more in details and how it can be used with Spring Cloud application in particular. We wanted to run the tests of system user interface, but to do that we needed all the backing systems to be available. Since the application works in reverse proxy setup, we had simply await for the depending services to register with ‘UP’ state in our discovery service. So what we wanted to do, is to adjust the application health endpoint to make it aware whether the routes to backend services are available.

This required to register custom Spring Boot’s Actuator health indicator. Actually I had prepared a small utility project that will do this:

https://github.com/jmnarloch/zuul-route-health-spring-cloud-starter

It can be turn on and off through settings ‘zulu.health.enabled’ flag:

zuul:
  ignoredServices: '*'
  health:
    enabled: true
  routes:
    resource:
      path: /api/**
      serviceId: rest-service
    oauth2:
      path: /uaa/**
      serviceId: oauth2
      stripPrefix: false

So you can configure it for your test/dev/stg environments and disable it for production, mostly because cascading the system failures might not be such a good idea. Though for our use case this worked more then well.

The extensions has it’s limitations, at this point it only checks the explicitly configures services (the routes with specified serviceId) and verifies whether they are accessible from the discovery service.

Health check in practice
zuul-health