Introduction
Spring Cloud Stream is a interesting initiative for building message driven application in the widely considered Spring ecosystem. I think that the main idea is ease the usage and configuration to the bare minimum compared to more complex solution which the Spring Integration apparently is.
Altogether Spring Cloud Stream introduces the idea of binders, which are responsible for handling the integration between different MOM at the moment having out of the support for:
- RabbitMQ
- Kafka
- Redis
- GemFire
For additional information I highly recommend going through the Spring Cloud Stream reference guide.
Allegro Hermes is message broker build on top Kafka with REST API allowing to easily be integrated by HTTP based clients. It also has a rich set of features allowing to pass JSON and binary AVRO messages as well as broadcasting the messages or sending them in batches.
In order to be able to consume it through Spring Cloud Stream we need to provide a dedicated binder that will be able to connect the messages to Hermes.
Fortunately there is one here:
https://github.com/jmnarloch/hermes-spring-cloud-starter-stream
Example:
Let’s try to use it in practice, starting from sample project. You may want to first go through the Hermes quickstart guide to set up your environment.
Next we will download Spring Initializr template using httpie.
$ http -f POST https://start.spring.io/starter.zip type=gradle-project style=cloud-stream-binder-kafka > demo.zip $ unzip demo.zip
Afterwards you can import the project using your favorite IDE.
The first is to do is to replace the spring-cloud-starter-stream-kafka with hermes binder:
compile('io.jmnarloch:hermes-spring-cloud-starter-stream:0.2.0')
Let’s start by configuring the Hermes URI for the binder.
spring: cloud: stream: hermes: binder: uri: 'http://frontend.hermes.local:8080'
Now we can design our binding and the POJO used for the message.
package io.jmnarloch.stream.hermes; import java.math.BigDecimal; import java.util.UUID; public class PriceChangeEvent { private final UUID productId; private final BigDecimal oldPrice; private final BigDecimal newPrice; public PriceChangeEvent(UUID productId, BigDecimal oldPrice, BigDecimal newPrice) { this.productId = productId; this.oldPrice = oldPrice; this.newPrice = newPrice; } public UUID getProductId() { return productId; } public BigDecimal getOldPrice() { return oldPrice; } public BigDecimal getNewPrice() { return newPrice; } }
And binding for the message channel.
package io.jmnarloch.stream.hermes; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface Events { @Output MessageChannel priceChanges(); }
Through configuration we can specify, the destination topic name and the default content type of the topic.
spring: cloud: stream: bindings: priceChanges: destination: 'io.jmnarloch.price.change' contentType: 'application/json'
In order to enable Spring Cloud Stream binding we need to annotate our configuration class.
@Configuration @EnableBinding(Events.class) public class EventsConfiguration { }
Using the binding is straightforward, a proper proxy is going to be created and can be afterwards injected.
@Component public class EventsProducer { private final Events events; @Autowired public EventsProducer(Events events) { this.events = events; } public void publishPriceChange(PriceChangeEvent event) { events.priceChanges().send(new GenericMessage<>(event)); } }
Finally, we can publish our message:
eventsProducer.publishPriceChange(new PriceChangeEvent(uuid, oldPrice, newPrice));
At the moment the binder itself is still under development, but yet this presents the workable example.
Publishing AVRO binary messages is almost as simple as the JSON ones and I’m going to cover that in fallowing blog post.