RabbitMQ Archives - Piotr's TechBlog https://piotrminkowski.com/tag/rabbitmq/ Java, Spring, Kotlin, microservices, Kubernetes, containers Fri, 01 Jan 2021 14:14:04 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 RabbitMQ Archives - Piotr's TechBlog https://piotrminkowski.com/tag/rabbitmq/ 32 32 181738725 RabbitMQ Monitoring on Kubernetes https://piotrminkowski.com/2020/09/29/rabbitmq-monitoring-on-kubernetes/ https://piotrminkowski.com/2020/09/29/rabbitmq-monitoring-on-kubernetes/#comments Tue, 29 Sep 2020 10:55:02 +0000 https://piotrminkowski.com/?p=8883 RabbitMQ monitoring can be a key point of your system management. Therefore we should use the right tools for that. To enable them on RabbitMQ we need to install some plugins. In this article, I will show you how to use Prometheus and Grafana to monitor the key metrics of RabbitMQ. Of course, we will […]

The post RabbitMQ Monitoring on Kubernetes appeared first on Piotr's TechBlog.

]]>
RabbitMQ monitoring can be a key point of your system management. Therefore we should use the right tools for that. To enable them on RabbitMQ we need to install some plugins. In this article, I will show you how to use Prometheus and Grafana to monitor the key metrics of RabbitMQ. Of course, we will build the applications that send and receive messages. We will use Kubernetes as a target platform for our system. In the last step, we are going to enable the tracing plugin. It helps us in collecting the list of incoming messages.

Source code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my repository sample-spring-amqp. Inside k8s directory, you will find all the required deployment manifests. Both Spring Boot test applications are available inside listener and producer directories. If you would like to test the cluster of RabbitMQ please refer to the article RabbitMQ in cluster.

Step 1 – Building a RabbitMQ image

In the first step, we are overriding the Docker image of RabbitMQ. In that case, we need to extend the base image with a tag 3-management and add two plugins. The plugin rabbitmq_prometheus adds Prometheus exporter of core RabbitMQ metrics. The second of them, rabbitmq_tracing allows us to log the payloads of incoming messages. That’s all that we need to define in our Dockerfile, which is visible below.

FROM rabbitmq:3-management
RUN rabbitmq-plugins enable --offline rabbitmq_prometheus rabbitmq_tracing

Then you just need to build an already defined image. Let’s say its name is piomin/rabbitmq-monitoring. After building I’m pushing it to my remote Docker registry.

$ docker build -t piomin/rabbitmq-monitoring .
$ docker push piomin/rabbitmq-monitoring

Step 2 – Deploying RabbitMQ on Kubernetes

Now, we are going to deploy our custom image of RabbitMQ on Kubernetes. For the purpose of this article, we will run a standalone version of RabbitMQ. In that case, the only thing we need to do is to override some configuration properties in the rabbitmq.conf and enabled_plugins files. First, I’m enabling logging to the console at the debug level. Then I’m also enabling all the required plugins.

apiVersion: v1
kind: ConfigMap
metadata:
  name: rabbitmq
  labels:
    name: rabbitmq
data:
  rabbitmq.conf: |-
    loopback_users.guest = false
    log.console = true
    log.console.level = debug
    log.exchange = true
    log.exchange.level = debug
  enabled_plugins: |-
    [rabbitmq_management,rabbitmq_prometheus,rabbitmq_tracing].

Both rabbitmq.conf and enabled_plugins files should be placed inside the /etc/rabbitmq directory. Therefore, I’m mounting them inside the volume assigned to the RabbitMQ Deployment. Additionally, we are exposing three ports outside the container. The port 5672 is used in communication with applications through AMQP protocol. The Prometheus plugin exposes metrics on the dedicated port 15692. In order to access Management UI and HTTP endpoints, you should use the 15672 port.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: rabbitmq
  labels:
    app: rabbitmq
spec:
  replicas: 1
  selector:
    matchLabels:
      app: rabbitmq
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      containers:
        - name: rabbitmq
          image: piomin/rabbitmq-monitoring:latest
          ports:
            - containerPort: 15672
              name: http
            - containerPort: 5672
              name: amqp
            - containerPort: 15692
              name: prometheus
          volumeMounts:
            - name: rabbitmq-config-map
              mountPath: /etc/rabbitmq/
      volumes:
        - name: rabbitmq-config-map
          configMap:
            name: rabbitmq

Step 3 – Building Spring Boot listener application

Our sample listener application uses Spring Boot AMQP project for integration with RabbitMQ. Thanks to Spring Boot Actuator module it is also exposing metrics including RabbiMQ specific values. It is important to expose them in the format readable by Prometheus.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
   <groupId>io.micrometer</groupId>
   <artifactId>micrometer-registry-prometheus</artifactId>
</dependency>

The listener application defines and creates two exchanges. First of them, trx-events-topic, is used for multicast communication. On the other hand, trx-events-direct takes a part in point-to-point communication. Both our sample applications are exchanging messages in JSON format. Therefore we have to override a default Spring Boot AMQP message converter with Jackson2JsonMessageConverter.

@SpringBootApplication
public class ListenerApplication {

   public static void main(String[] args) {
      SpringApplication.run(ListenerApplication.class, args);
   }

   @Bean
   public TopicExchange topic() {
      return new TopicExchange("trx-events-topic");
   }

   @Bean
   public DirectExchange queue() {
      return new DirectExchange("trx-events-direct");
   }

   @Bean
   public MessageConverter jsonMessageConverter() {
      return new Jackson2JsonMessageConverter();
   }
}

The listener application receives messages from the both topic and direct exchanges. Each running instance of this application is creating a queue binding with the random name. With a direct exchange, only a single queue is receiving incoming messages. On the other hand, all the queues related to a topic exchange are receiving incoming messages.

@Component
@Slf4j
public class ListenerComponent {

   @RabbitListener(bindings = {
      @QueueBinding(
         exchange = @Exchange(type = ExchangeTypes.TOPIC, name = "trx-events-topic"),
         value = @Queue("${topic.queue.name}")
      )
   })
   public void onTopicMessage(SampleMessage message) {
      log.info("Message received: {}", message);
   }

   @RabbitListener(bindings = {
      @QueueBinding(
         exchange = @Exchange(type = ExchangeTypes.DIRECT, name = "trx-events-direct"),
         value = @Queue("${direct.queue.name}")
      )
   })
   public void onDirectMessage(SampleMessage message) {
      log.info("Message received: {}", message);
   }

}

The name of queues assigned to the topic and direct exchanges is configured inside application.yml file.

topic.queue.name: t-${random.uuid}
direct.queue.name: d-${random.uuid}

Step 4 – Building Spring Boot producer application

The producer application also uses Spring Boot AMQP for integration with RabbitMQ. It sends messages to the exchanges with RabbitTemplate. Similarly to the listener application it formats all the messages as JSON string.

@SpringBootApplication
@EnableScheduling
public class ProducerApplication {

   public static void main(String[] args) {
      SpringApplication.run(ProducerApplication.class, args);
   }

   @Bean
   public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
      final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
      rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
      return rabbitTemplate;
   }

   @Bean
   public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
      return new Jackson2JsonMessageConverter();
   }

}

The producer application starts sending messages just after startup. Each message contains id, type and message fields. They are sent to the topic exchange with 5 seconds interval, and to the direct exchange with 2 seconds interval.

@Component
@Slf4j
public class ProducerComponent {

   int index = 1;
   private RabbitTemplate rabbitTemplate;

   ProducerComponent(RabbitTemplate rabbitTemplate) {
      this.rabbitTemplate = rabbitTemplate;
   }

   @Scheduled(fixedRate = 5000)
   public void sendToTopic() {
      SampleMessage msg = new SampleMessage(index++, "abc", "topic");
      rabbitTemplate.convertAndSend("trx-events-topic", null, msg);
      log.info("Sending message: {}", msg);
   }

   @Scheduled(fixedRate = 2000)
   public void sendToDirect() {
      SampleMessage msg = new SampleMessage(index++, "abc", "direct");
      rabbitTemplate.convertAndSend("trx-events-direct", null, msg);
      log.info("Sending message: {}", msg);
   }
   
}

Step 5 – Deploying Prometheus and Grafana for RabbitMQ monitoring

We will use Prometheus for collecting metrics from RabbitMQ, and our both Spring Boot applications. Prometheus detects endpoints with metrics by the Kubernetes Service app label and a HTTP port name. Of course, you can define different search criteria for that. Because Spring Boot and RabbitMQ metrics are defined under different endpoints, we need to define two jobs. On the source code below you can see the ConfigMap that contains the Prometheus configuration file.

apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus
  labels:
    name: prometheus
data:
  prometheus.yml: |-
    scrape_configs:
      - job_name: 'springboot'
        metrics_path: /actuator/prometheus
        scrape_interval: 5s
        kubernetes_sd_configs:
        - role: endpoints
          namespaces:
            names:
              - default

        relabel_configs:
          - source_labels: [__meta_kubernetes_service_label_app]
            separator: ;
            regex: (producer|listener)
            replacement: $1
            action: keep
          - source_labels: [__meta_kubernetes_endpoint_port_name]
            separator: ;
            regex: http
            replacement: $1
            action: keep
          # ...
      - job_name: 'rabbitmq'
        metrics_path: /metrics
        scrape_interval: 5s
        kubernetes_sd_configs:
        - role: endpoints
          namespaces:
            names:
              - default

        relabel_configs:
          - source_labels: [__meta_kubernetes_service_label_app]
            separator: ;
            regex: rabbitmq
            replacement: $1
            action: keep
          - source_labels: [__meta_kubernetes_endpoint_port_name]
            separator: ;
            regex: prometheus
            replacement: $1
            action: keep
          # ...

For the full version of Prometheus deployment please refer to the source code. Prometheus tries to discover metric endpoints by the Kubernetes Service label and port name. Let’s take a look on the Service for the listener application.

apiVersion: v1
kind: Service
metadata:
  name: listener-service
  labels:
    app: listener
spec:
  type: ClusterIP
  selector:
    app: listener
  ports:
  - port: 8080
    name: http

Similarly, we should create Service for RabbitMQ.

apiVersion: v1
kind: Service
metadata:
  name: rabbitmq-service
  labels:
    app: rabbitmq
spec:
  type: NodePort
  selector:
    app: rabbitmq
  ports:
  - port: 15672
    name: http
  - port: 5672
    name: amqp
  - port: 15692
    name: prometheus

Step 6 – Deploying stack for RabbitMQ monitoring

We can finally proceed to the deployment on Kubernetes. In summary, we have five running applications. Three of them, RabbitMQ with the management console, Prometheus, and Grafana are a part of the RabbitMQ monitoring stack. We also have a single instance of the Spring Boot AMQP producer application, and two instances of the Spring Boot AMQP listener application. You can see the final list of pods in the picture below.

rabbitmq-monitoring-pods

If you deploy Spring Boot applications with skaffold dev --port-forward command, you can easily access them on the local port. Other applications can be accessed via Kubernetes Service NodePort.

Step 7 – RabbitMQ monitoring with Prometheus metrics

We can easily verify a list of metrics generated by Spring Boot applications by calling /actuator/prometheus endpoint. First, let’s take a look at the metrics returned by the listener application.

rabbitmq_not_acknowledged_published_total{name="rabbit",} 0.0
rabbitmq_unrouted_published_total{name="rabbit",} 0.0
rabbitmq_channels{name="rabbit",} 2.0
rabbitmq_consumed_total{name="rabbit",} 2432.0
rabbitmq_connections{name="rabbit",} 1.0
rabbitmq_acknowledged_total{name="rabbit",} 2432.0
spring_rabbitmq_listener_seconds_max{exception="none",listener_id="org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1",queue="d-ea28bd07-929d-4928-8d2c-5dceeec9950a",result="success",} 0.0025406
spring_rabbitmq_listener_seconds_max{exception="none",listener_id="org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0",queue="t-e8990fe4-7d8c-4a2c-96d7-fff4fe503265",result="success",} 0.0024175
spring_rabbitmq_listener_seconds_count{exception="none",listener_id="org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1",queue="d-ea28bd07-929d-4928-8d2c-5dceeec9950a",result="success",} 1712.0
spring_rabbitmq_listener_seconds_sum{exception="none",listener_id="org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1",queue="d-ea28bd07-929d-4928-8d2c-5dceeec9950a",result="success",} 0.992886413
spring_rabbitmq_listener_seconds_count{exception="none",listener_id="org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0",queue="t-e8990fe4-7d8c-4a2c-96d7-fff4fe503265",result="success",} 720.0
spring_rabbitmq_listener_seconds_sum{exception="none",listener_id="org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0",queue="t-e8990fe4-7d8c-4a2c-96d7-fff4fe503265",result="success",} 0.598468801
rabbitmq_acknowledged_published_total{name="rabbit",} 0.0
rabbitmq_failed_to_publish_total{name="rabbit",} 0.0
rabbitmq_rejected_total{name="rabbit",} 0.0
rabbitmq_published_total{name="rabbit",} 0.0

Similarly, we can verify the list of metrics from the producer application. In contrast to the listener application, it is generating rabbitmq_published_total instead of rabbitmq_consumed_total.

rabbitmq_acknowledged_published_total{name="rabbit",} 0.0
rabbitmq_unrouted_published_total{name="rabbit",} 0.0
rabbitmq_acknowledged_total{name="rabbit",} 0.0
rabbitmq_rejected_total{name="rabbit",} 0.0
rabbitmq_connections{name="rabbit",} 1.0
rabbitmq_not_acknowledged_published_total{name="rabbit",} 0.0
rabbitmq_consumed_total{name="rabbit",} 0.0
rabbitmq_failed_to_publish_total{name="rabbit",} 0.0
rabbitmq_published_total{name="rabbit",} 2553.0
rabbitmq_channels{name="rabbit",} 1.0

The list of metrics generated by the RabbitMQ Prometheus plugin is pretty impressive. I decided to use only some of them.

rabbitmq_channel_consumers 6
rabbitmq_channel_messages_published_total 2926
rabbitmq_channel_messages_delivered_total 2022
rabbitmq_channel_messages_acked_total 5922
rabbitmq_connections_opened_total 9
rabbitmq_connection_incoming_bytes_total 700878
rabbitmq_connection_outgoing_bytes_total 1388158
rabbitmq_connection_channels 5
rabbitmq_queue_messages 5917
rabbitmq_queue_consumers 6
rabbitmq_queues 10

We can visualize all the metrics on the Grafana dashboard. Grafana is using Prometheus as a data source. To repeat, Prometheus collects data from the Spring Boot applications endpoints and RabbitMQ.

rabbitmq-monitoring-grafana

Step 8 – RabbitMQ monitoring with Tracing plugin

The tracing plugin allows us to log all incoming and outgoing messages. We can configure it in the RabbitMQ management console. In order to do that we need to switch to the “Admin” tab. Then, we can enable the logging of all messages or just those incoming to the particular queue or exchange.

rabbitmq-monitoring-tracing

The logs are available inside a file. You can download it. Each entry in the file contains detailed information about a message. You can verify the name of the node, change, or queue. Of course, it also contains the message payload, properties, and time of the event as shown below.

Conclusion

RabbitMQ monitoring tools allow you to verify general metrics of the node and detailed logs of every message. In addition, Spring Boot AMQP offers dedicated metrics for applications that interact with RabbitMQ. In this article, I described how to run the full monitoring stack on Kubernetes. Enjoy 🙂

The post RabbitMQ Monitoring on Kubernetes appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2020/09/29/rabbitmq-monitoring-on-kubernetes/feed/ 6 8883
Introduction to event-driven microservices with Spring Cloud Stream https://piotrminkowski.com/2020/06/05/introduction-to-event-driven-microservices-with-spring-cloud-stream/ https://piotrminkowski.com/2020/06/05/introduction-to-event-driven-microservices-with-spring-cloud-stream/#respond Fri, 05 Jun 2020 08:42:06 +0000 http://piotrminkowski.com/?p=7936 Spring Cloud Stream framework allows us to easily include well-known Spring patterns and best practices to applications while implementing event-driven microservices architecture. It uses the Spring Integration project to provide connectivity to a message broker. It provides built-in support for such features as a persistent publish-subscribe model, consumer grouping, and partitioning. The integration with a […]

The post Introduction to event-driven microservices with Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
Spring Cloud Stream framework allows us to easily include well-known Spring patterns and best practices to applications while implementing event-driven microservices architecture. It uses the Spring Integration project to provide connectivity to a message broker. It provides built-in support for such features as a persistent publish-subscribe model, consumer grouping, and partitioning. The integration with a specific message broker solution is realized by binder implementations that are hidden behind the middleware-neutral core.
The currently described version of Spring Cloud Stream is 3.0.3.RELEASE within Spring Cloud Release Train Hoxton.SR3.

For a more detailed introduction to a process of building Spring Cloud Stream microservices architecture with you can refer to my video course: Microservices With Spring Boot And Spring Cloud: Part 5 – Event-driven microservices.

Example of Spring Cloud Stream microservices

Our sample system consists of three microservices producer-service, consumer-a-service, and consumer-b-service. Each of them is connecting with the RabbitMQ broker. The application producer-service is responsible for sending events to a message broker, while two other microservices are listening for the incoming events. Communication between applications follows a publish-subscribe model, where data is broadcast through shared topics. Our consumer application is enabling such mechanisms like consumer grouping to guarantee that only a single instance of the application is handling the same event, and partitioning for assigning events to the selected instance of application basing on routing key set on the producer side. The following picture is visualizing a currently described architecture.

spring-cloud-stream-arch

The source code of sample applications is as usual available on GitHub. You may find it in the repository https://github.com/piomin/course-spring-microservices.git. That repository also contains code snippets of applications used in previous parts of my video course, so you should go to directory event-driven to access the right samples.

Dependencies to Spring Cloud Stream

To enable Spring Cloud Stream for our application we first need to include the right binder implementation library. Because we are integrating with RabbitMQ we have to use spring-cloud-stream-binder-rabbit artifact. It is referencing to all other required libraries, so we don’t have to include any other dependencies.

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

Messaging between Spring Cloud Stream microservices

Each message that is sent to the message broker is automatically serialized to JSON. Here’s a class that is the object representation of JSON payload exchanged by the applications.

data class CallmeEvent(val id: Int = 0, val message: String = "")

Producer application

Here’s the implementation of Supplier bean responsible for generating a stream of events continuously. By default it is sending CallmeEvent once a second. It is incrementing id field on every new event and set header to_process to true. Basing on the value of this header we are routing messages on the consumer side. The mechanism may be easily disabled by setting property callme.supplier.enabled to false.

@SpringBootApplication
class ProductionApplication {

    var id: Int = 0

    @Value("\${callme.supplier.enabled}")
    val supplierEnabled: Boolean = false

    @Bean
    fun callmeEventSupplier(): Supplier<Message<CallmeEvent>?> = Supplier { createEvent() }

    private fun createEvent(): Message<CallmeEvent>? {
        return if (supplierEnabled)
            MessageBuilder.withPayload(CallmeEvent(++id, "I'm callme event!"))
                    .setHeader("to_process", true)
                    .build()
        else
            null
    }
}

Alternatively we may send an event on demand, for example by calling REST endpoint. To do that we need to use StreamBridge bean. It provides send method that takes a name of binding and message object as parameters. We may set just a payload object or use MessageBuilder to create the whole GenericMessage with headers. Our controller is exposing two POST methods. First of them POST /{message} is used just for setting message body. The second method POST /{message}/process/{process} allows set header to_process.

@RestController
@RequestMapping("/producer")
class ProducerController(val streamBridge: StreamBridge) {

    var id: Int = 0

    @PostMapping("/{message}")
    fun sendEvent(@PathVariable message: String): Boolean {
        return streamBridge.send("callmeEventSupplier-out-0", CallmeEvent(++id, message))
    }

    @PostMapping("/{message}/process/{process}")
    fun sendEventWithHeader(@PathVariable message: String, @PathVariable process: Boolean): Boolean {
        return streamBridge.send("callmeEventSupplier-out-0",
                MessageBuilder.createMessage(CallmeEvent(++id, message),
                        MessageHeaders(mutableMapOf(Pair<String, Any>("to_process", process)))))
    }
}

Here is a configuration file of our application. We need a default name of the destination for our binding (1). By default, it is the same as a binding name, but we are going to change it to callme-events. The same destination will be set on the consumer side. If we are using StreamBridge for sending messages we also need to set property spring.cloud.stream.source with the name that is used as a prefix of the generated binding name. If you would like to use the same output as for Supplier you should set the same name as for method that registers Supplier bean – callmeEventSupplier. We should also increase a level of logging for Spring AMQP library to see the structure of messages sent to message broker (3).

spring.application.name=producer-service
spring.cloud.stream.bindings.callmeEventSupplier-out-0.destination=callme-events #(1)
spring.cloud.stream.source=callmeEventSupplier #(2)
logging.level.org.springframework.amqp=DEBUG #(3)
callme.supplier.enabled=true

Consumer application

There are two different applications that listen for incoming events. Let’s start from implementation of consumer-a-service. It is pretty simple, because it is just logging message that has been consumed from callme-events topic. To consume message from destination we need to define Consumer bean that takes CallmeEvent as an argument.

@SpringBootApplication
class ConsumerAApplication {

    val logger: Logger = LoggerFactory.getLogger(ConsumerAApplication::class.java)

    @Bean
    fun callmeEventConsumer(): Consumer<CallmeEvent> = Consumer { logger.info("Received: {}", it) }
}

In the application properties we also need to override a default name of destination for callmeEventConsumer-in-0 binding. That name is the same as name of output destination configured on the producer side – callme-events. We are also setting a consumer group for all instances of consumer-a-service application. The name of group is a. Consumer group guarantees that only a single instance of application in a group is handling a single incoming event.

spring.application.name=consumer-a-service
spring.cloud.stream.bindings.callmeEventConsumer-in-0.destination=callme-events
spring.cloud.stream.bindings.callmeEventConsumer-in-0.group=a

Message broker

We are running RabbitMQ on Docker container. We need to expose two ports outside container: 5672 for TCP connections from applications and 15672 for web management console. The following command starts the container.

$ docker run -d --name rabbit -h rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Now, we may run both our applications producer-service and consumer-a-service. We are going to run a single instance of producer-service and two instances of consumer-a-service. After that producer-service is starting to send events continuously to the destination on message broker. We may verify it with Rabbit Management Console. To login use default guest / guest credentials.

rabbit-list

Routing function

We can provide some more advanced routing on the consumer side. In Spring Cloud Stream nomenclature event routing is the ability to either route evens to a particular even subscriber or route event produced by an event subscriber to a particular destination. Event routing may be enabled for application by setting property spring.cloud.stream.function.routing.enabled to true. After that the generated name of bindings is automatically set to functionRouter-*.
I enabled event routing feature for consumer-b-service. Here’s the list of configuration properties required to enable routing for consumer listening on callme-events destination. When enabling event routing we also need to set a routing expression. We may use SPeL notation for it. In that case I’m performing routing basing on to_process header value.

spring.application.name=consumer-b-service
spring.cloud.stream.bindings.functionRouter-in-0.destination=callme-events
spring.cloud.stream.bindings.functionRouter-in-0.group=b
spring.cloud.stream.function.routing.enabled=true
spring.cloud.function.routing-expression=(headers['to_process']!=null && headers['to_process']==true) ? 'process':'fireForget'

The name of declared Consumer or Function beans should be the same as the values returned by spring.cloud.function.routing-expression. In that case these are fireForget() and process().

@SpringBootApplication
class ConsumerBApplication {

    val logger: Logger = LoggerFactory.getLogger(ConsumerBApplication::class.java)

    @Bean
    fun fireForget(): Consumer<CallmeEvent> = Consumer { logger.info("Received(fireForget): {}", it) }

    @Bean
    fun process(): Function<CallmeEvent, CallmeResponse> = Function { logAndResponse(it) }

    private fun logAndResponse(callmeEvent: CallmeEvent): CallmeResponse {
        logger.info("Received(process): {}", callmeEvent)
        return CallmeResponse(callmeEvent.id, "I'm callme response")
    }
}

Here’s the list of exchanges used by our system after starting consumer-b-service. Since the name of destination for functionRouter-in-0 is overridden in configuration properties, the name of destination for functionRouter-out-0 is left at its default value.

spring-cloud-stream-routing

Partitioning

Partitioning is a critical concept in stateful processing, where it is required to ensure that all related data is handled together. So, thanks to partitioning we may implement an algorithm responsible for distributing messages across multiple instances of applications in a determined way.
To enable partitioning on the producer side we need to set two properties related to a given binding. It is producer’s partitionKeyExpression, which in that case the id field of CallmeEvent, and producer’s partitionCount with a number of partitions. This number should be the same as a number of running instances of the application, since each partition is assigned to the selected instance of that application. Because we are planning to run two instances, we are setting such a value.

spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionCount=2

To enable partitioning on the consumer side we need to enable a single property on a given binding. We
also need to set a property spring.cloud.stream.instanceCount using the static number that represents a number of deployed instances of an application. While Kafka supports a more flexible way of partitioning configuration, RabbitMQ requires a static value, so we can’t scale up numbers dynamically without changing these properties. We also need to set property spring.cloud.stream.instanceIndex for a single instance. It needs to 0 or 1 if we have two running instances as shown below.

spring-cloud-stream-intellij-start

Now we take a look on structure of routing inside callme-events exchange created on RabbitMQ. Two consumers are listening for incoming events per a single consumer group. So if we have to different consumer groups defined there 4 consumers in total. Within each group, we have two different routing keys set for each target queue.

partitioning-rabbitmq

We can also verify that four queues receive incoming events.

spring-cloud-stream-partitioning-queues

Testing

Spring Cloud Stream provides an easy way of testing your microservice applications without need to connecting to an external messaging system. To enable that support we need to include the following dependency to Maven pom.xml.

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream</artifactId>
   <version>3.0.3.RELEASE</version>
   <type>test-jar</type>
   <scope>test</scope>
   <classifier>test-binder</classifier>
</dependency>

In JUnit test class we should import TestChannelBinderConfiguration class. after that we may use InputDestination for sending test messages, and OutputDestination for receiving them during a test. The following testRouter is created for consumer-b-service, and it verifies that RouterFunction is working properly by receiving and validating message sent to an output destination.

@SpringBootTest(classes = [ConsumerBApplication::class])
@Import(TestChannelBinderConfiguration::class)
class RouterFunctionTest {

    @Autowired
    lateinit var inputDestination: InputDestination
    @Autowired
    lateinit var outputDestination: OutputDestination

    @Test
    fun testRouter() {
        inputDestination.send(MessageBuilder.withPayload(CallmeEvent(1, "I'm callme event"))
                .setHeader("to_process", true)
                .build())
        val response = outputDestination.receive()
        Assertions.assertNotNull(response)
        Assertions.assertTrue(response.payload.isNotEmpty())
        val payload = String(response.payload)
        val payloadObject = ObjectMapper().readValue(payload, CallmeResponse::class.java)
        Assertions.assertEquals(1, payloadObject.id)
        Assertions.assertEquals("I'm callme response", payloadObject.message)
    }

}

The post Introduction to event-driven microservices with Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2020/06/05/introduction-to-event-driven-microservices-with-spring-cloud-stream/feed/ 0 7936
RabbitMQ Cluster with Consul and Vault https://piotrminkowski.com/2018/12/27/rabbitmq-cluster-with-consul-and-vault/ https://piotrminkowski.com/2018/12/27/rabbitmq-cluster-with-consul-and-vault/#comments Thu, 27 Dec 2018 23:20:07 +0000 https://piotrminkowski.wordpress.com/?p=6930 Almost two years ago I wrote an article about RabbitMQ clustering RabbitMQ in cluster. It was one of the first posts on my blog, and it’s really hard to believe it has been two years since I started this blog. Anyway, one of the questions about the topic described in the mentioned article inspired me […]

The post RabbitMQ Cluster with Consul and Vault appeared first on Piotr's TechBlog.

]]>
Almost two years ago I wrote an article about RabbitMQ clustering RabbitMQ in cluster. It was one of the first posts on my blog, and it’s really hard to believe it has been two years since I started this blog. Anyway, one of the questions about the topic described in the mentioned article inspired me to return to that subject one more time. That question pointed to the problem of an approach to setting up the cluster. This approach assumes that we are manually attaching new nodes to the cluster by executing the command rabbitmqctl join_cluster with cluster name as a parameter. If I remember correctly it was the only one available method of creating a cluster at that time. Today we have more choices, which illustrates an evolution of RabbitMQ during the last two years. RabbitMQ cluster can be formed in a number of ways:

  • Manually with rabbitmqctl (as described in my article RabbitMQ in cluster)
  • Declaratively by listing cluster nodes in config file
  • Using DNS-based discovery
  • Using AWS (EC2) instance discovery via a dedicated plugin
  • Using Kubernetes discovery via a dedicated plugin
  • Using Consul discovery via a dedicated plugin
  • Using etcd-based discovery via a dedicated plugin

Today, I’m going to show you how to create RabbitMQ cluster using service discovery based on HashiCorp’s Consul. Additionally, we will include Vault to our architecture in order to use its interesting feature called secrets engine for managing credentials used for accessing RabbitMQ. We will set up this sample on the local machine using Docker images of RabbitMQ, Consul and Vault. Finally, we will test our solution using a simple Spring Boot application that sends and listens for incoming messages to the cluster. That application is available on GitHub repository sample-haclustered-rabbitmq-service in the branch consul.

Architecture

We use Vault as a credentials manager when applications try to authenticate against RabbitMQ nodes or when a user tries to login to RabbitMQ web admin console. Each RabbitMQ node registers itself after startup in Consul and retrieves a list of nodes running inside a cluster. Vault is integrated with RabbitMQ using a dedicated secrets engine. Here’s an architecture of our sample solution.

rabbit-consul-logo (1)

1. Configure RabbitMQ Consul plugin

The integration between RabbitMQ and Consul is realized via plugin rabbitmq-peer-discovery-consul. This plugin is not enabled by default on the official RabbitMQ Docker container. So, the first step is to build our own Docker image based on the official RabbitMQ image that installs and enables the required plugin. By default, RabbitMQ main configuration file is available under path /etc/rabbitmq/rabbitmq.conf inside a Docker container. To override it we just use the COPY statement as shown below. The following Dockerfile definition takes RabbitMQ with a management web console as base image and enables rabbitmq_peer_discovery_consul plugin.


FROM rabbitmq:3.7.8-management
COPY rabbitmq.conf /etc/rabbitmq
RUN rabbitmq-plugins enable --offline rabbitmq_peer_discovery_consul

Now, let’s take a closer look on our plugin configuration settings. Because I run Docker on Windows Consul is not available under default localhost address, but on 192.168.99.100. So, first we need to set that IP address using property cluster_formation.consul.host. We also need to set Consul as a default peer discovery implementation by setting the name of plugin for property cluster_formation.peer_discovery_backend. Finally, we have to set two additional properties to make it work in our local Docker environment. It is related to the address of RabbitMQ node sent to Consul during the registration process. It is important to compute it properly, and not to send for example localhost. After setting property cluster_formation.consul.svc_addr_use_nodename to false node will register itself using host name instead of node name. We can set the name of the host for the container inside its running command. Here’s my full RabbitMQ configuration file used in the demo for this article.

loopback_users.guest = false
listeners.tcp.default = 5672
hipe_compile = false
management.listener.port = 15672
management.listener.ssl = false
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_consul
cluster_formation.consul.host = 192.168.99.100
cluster_formation.consul.svc_addr_auto = true
cluster_formation.consul.svc_addr_use_nodename = false

After saving the configuration visible above in the file rabbitmq.conf we can proceed to building our custom Docker image with RabbitMQ. This image is available in my Docker repository under alias piomin/rabbitmq, but you can also build it by yourself from Dockerfile by executing the following command.

$ docker build -t piomin/rabbitmq:1.0 .
Sending build context to Docker daemon  3.072kB
Step 1 : FROM rabbitmq:3.7.8-management
 ---> d69a5113ceae
Step 2 : COPY rabbitmq.conf /etc/rabbitmq
 ---> aa306ef88085
Removing intermediate container fda0e21178f9
Step 3 : RUN rabbitmq-plugins enable --offline rabbitmq_peer_discovery_consul
 ---> Running in 0892a42bffef
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_peer_discovery_common
  rabbitmq_peer_discovery_consul
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@fda0e21178f9...
The following plugins have been enabled:
  rabbitmq_peer_discovery_common
  rabbitmq_peer_discovery_consul

set 5 plugins.
Offline change; changes will take effect at broker restart.
 ---> cfe73f9d9904
Removing intermediate container 0892a42bffef
Successfully built cfe73f9d9904

2. Running RabbitMQ cluster on Docker

In the previous step we have succesfully created a Docker image of RabbitMQ configured to run in cluster mode using Consul discovery. Before running this image we need to start an instance of Consul. Here’s the command that starts the Docker container with Consul and exposes it on port 8500.

$ docker run -d --name consul -p 8500:8500 consul

We will also create a Docker network to enable communication between containers by hostname. It is required in this scenario, because each RabbitMQ container is registered using container hostname.

$ docker network create rabbitmq

Now, we can run our three clustered RabbitMQ containers. We will set a unique hostname for every single container (using -h option) and set the same Docker network everywhere. We also have to set the environment variable RABBITMQ_ERLANG_COOKIE.

$ docker run -d --name rabbit1 -h rabbit1 --network rabbitmq -p 30000:5672 -p 30010:15672 -e RABBITMQ_ERLANG_COOKIE='rabbitmq' piomin/rabbitmq:1.0
$ docker run -d --name rabbit2 -h rabbit2 --network rabbitmq -p 30001:5672 -p 30011:15672 -e RABBITMQ_ERLANG_COOKIE='rabbitmq' piomin/rabbitmq:1.0
$ docker run -d --name rabbit3 -h rabbit3 --network rabbitmq -p 30002:5672 -p 30012:15672 -e RABBITMQ_ERLANG_COOKIE='rabbitmq' piomin/rabbitmq:1.0

After running all three instances of RabbitMQ we can first take a look on the Consul web console. You should see there the new service called rabbitmq. This value is the default name of a cluster set by RabbitMQ Consul plugin. We can override inside rabbitmq.conf using cluster_formation.consul.svc property.

rabbit-consul-1

We can check out if the cluster has been succesfully started using RabbitMQ web management console. Every node is exposing it. I just had to override default port 15672 to avoid port conflicts between three running instances.

rabbit-consul-10

3. Integrating RabbitMQ with Vault

In the two previous steps we have succesfully run the cluster of three RabbitMQ nodes based on Consul discovery. Now, we will include Vault to our sample system to dynamically generate user credentials. Let’s begin from running Vault on Docker. You can find detailed information about it in my previous article Secure Spring Cloud Microservices with Vault and Nomad. We will run Vault in development mode using the following command.

$ docker run --cap-add=IPC_LOCK -d --name vault -p 8200:8200 vault

You can copy the root token from container logs using docker logs -f vault command. Then you have to login to the Vault web console available under address http://192.168.99.100:8200 using this token and enable RabbitMQ secret engine as shown below.

rabbit-consul-2

And confirm.

rabbit-consul-3

You can easily run Vault commands using a terminal provided by the web admin console or do the same thing using HTTP API. The first command visible below is used for writing connection details. We just need to pass RabbitMQ address and admin user credentials. The provided configuration settings points to #1 RabbitMQ node, but the changes are then replicated to the whole cluster.

$ vault write rabbitmq/config/connection connection_uri="http://192.168.99.100:30010" username="guest" password="guest"

The next step is to configure a role that maps a name in Vault to virtual host permissions.

$ vault write rabbitmq/roles/default vhosts='{"/":{"write": ".*", "read": ".*"}}'

We can test our newly created configuration by running command vault read rabbitmq/creds/default as shown below.

rabbit-consul-4

4. Sample application

Our sample application is pretty simple. It consists of two modules. First of them sender is responsible for sending messages to RabbitMQ, while second listener for receiving incoming messages. Both of them are Spring Boot applications that integrate with RabbitMQ and Vault using the following dependencies.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-vault-config-rabbitmq</artifactId>
   <version>2.0.2.RELEASE</version>
</dependency>

We need to provide some configuration settings in bootstrap.yml file to integrate our application with Vault. First, we need to enable plugin for that integration by setting property spring.cloud.vault.rabbitmq.enabled to true. Of course, Vault address and root token are required. It is also important to set property spring.cloud.vault.rabbitmq.role with the name of Vault role configured in step 3. Spring Cloud Vault injects username and password generated by Vault to the application properties spring.rabbitmq.username and spring.rabbitmq.password, so the only thing we need to configure in bootstrap.yml file is the list of available cluster nodes.

spring:
  rabbitmq:
    addresses: 192.168.99.100:30000,192.168.99.100:30001,192.168.99.100:30002
  cloud:
    vault:
      uri: http://192.168.99.100:8200
      token: s.7DaENeiqLmsU5ZhEybBCRJhp
      rabbitmq:
        enabled: true
        role: default
        backend: rabbitmq

For the test purposes you should enable high-available queues on RabbitMQ. For instructions how to configure them using policies you can refer to my article RabbitMQ in cluster. The application works at the level of exchanges. Auto-configured connection factory is injected into the application and set for RabbitTemplate bean.

@SpringBootApplication
public class Sender {
   
   private static final Logger LOGGER = LoggerFactory.getLogger("Sender");
   
   @Autowired
   RabbitTemplate template;

   public static void main(String[] args) {
      SpringApplication.run(Sender.class, args);
   }

   @PostConstruct
   public void send() {
      for (int i = 0; i < 1000; i++) {
         int id = new Random().nextInt(100000);
         template.convertAndSend(new Order(id, "TEST"+id, OrderType.values()[(id%2)]));
      }
      LOGGER.info("Sending completed.");
   }
    
    @Bean
    public RabbitTemplate template(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange("ex.example");
        return rabbitTemplate;
    }
    
}

Our listener app is connected only to the third node of the cluster (spring.rabbitmq.addresses=192.168.99.100:30002). However, the test queue is mirrored between all clustered nodes, so it is able to receive messages sent by sender app. You can easily test using my sample applications.

@SpringBootApplication
@EnableRabbit
public class Listener {

   private static final  Logger LOGGER = LoggerFactory.getLogger("Listener");

   private Long timestamp;

   public static void main(String[] args) {
      SpringApplication.run(Listener.class, args);
   }

   @RabbitListener(queues = "q.example")
   public void onMessage(Order order) {
      if (timestamp == null)
         timestamp = System.currentTimeMillis();
      LOGGER.info((System.currentTimeMillis() - timestamp) + " : " + order.toString());
   }

   @Bean
   public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(connectionFactory);
      factory.setConcurrentConsumers(10);
      factory.setMaxConcurrentConsumers(20);
      return factory;
   }
   
}

The post RabbitMQ Cluster with Consul and Vault appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/12/27/rabbitmq-cluster-with-consul-and-vault/feed/ 2 6930
Building and testing message-driven microservices using Spring Cloud Stream https://piotrminkowski.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/ https://piotrminkowski.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/#comments Fri, 15 Jun 2018 08:46:04 +0000 https://piotrminkowski.wordpress.com/?p=6678 In this article, you will learn how to improve automated testing of message-driven microservices with Spring Cloud Stream. Spring Boot and Spring Cloud give you a great opportunity to build microservices fast using different styles of communication. You can create synchronous REST microservices based on Spring Cloud Netflix libraries as shown in one of my […]

The post Building and testing message-driven microservices using Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to improve automated testing of message-driven microservices with Spring Cloud Stream. Spring Boot and Spring Cloud give you a great opportunity to build microservices fast using different styles of communication. You can create synchronous REST microservices based on Spring Cloud Netflix libraries as shown in one of my previous articles Quick Guide to Microservices with Spring Boot 2.0, Eureka and Spring Cloud. You can create asynchronous, reactive microservices deployed on Netty with Spring WebFlux project and combine it successfully with some Spring Cloud libraries as shown in my article Reactive Microservices with Spring WebFlux and Spring Cloud. And finally, you may implement message-driven microservices based on the publish/subscribe model using Spring Cloud Stream and a message broker like Apache Kafka or RabbitMQ. The last of the listed approaches to building microservices is the main subject of this article. I’m going to show you how to effectively build, scale, run, and test messaging microservices based on RabbitMQ broker.

Architecture

For the purpose of demonstrating Spring Cloud Stream testing features we will design a sample system that uses publish/subscribe model for inter-service communication. We have three microservices: order-service, product-service and account-service. Application order-service exposes HTTP endpoint that is responsible for processing orders sent to our system. All the incoming orders are processed asynchronously – order-service prepare and send messages to RabbitMQ exchange and then respond to the calling client that the request has been accepted for processing. Applications account-service and product-service are listening for the order messages incoming to the exchange. Microservice account-service is responsible for checking if there are sufficient funds on a customer’s account for order realization and then withdrawing cash from this account. Microservice product-service checks if there is a sufficient amount of products in the store, and changes the number of available products after processing the order. Both account-service and product-service send asynchronous response through RabbitMQ exchange (this time it is one-to-one communication using direct exchange) with a status of operation. Microservice order-service after receiving response messages sets the appropriate status of the order and exposes it through REST endpoint GET /order/{id} to the external client.

If you feel that the description of our sample system is a little incomprehensible, here’s the diagram with architecture for clarification.

stream-1

Enabling Spring Cloud Stream

The recommended way to include Spring Cloud Stream in the project is with a dependency management system. Spring Cloud Stream has an independent release train management in relation to the whole Spring Cloud framework. However, if we have declared spring-cloud-dependencies in the Elmhurst.RELEASE version inside the dependencyManagement
section, we wouldn’t have to declare anything else in pom.xml. If you prefer to use only the Spring Cloud Stream project, you should define the following section.

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-dependencies</artifactId>
      <version>Elmhurst.RELEASE</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

The next step is to add spring-cloud-stream artifact to the project dependencies. I also recommend you include at least the spring-cloud-sleuth library to provide sending messaging with the same traceId as the source request incoming to order-service.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-sleuth</artifactId>
</dependency>

Spring Cloud Stream programming model

To enable connectivity to a message broker for your application, annotate the main class with @EnableBinding. The @EnableBinding annotation takes one or more interfaces as parameters. You may choose between three interfaces provided by Spring Cloud Stream:

  • Sink: This is used for marking a service that receives messages from the inbound channel.
  • Source: This is used for sending messages to the outbound channel.
  • Processor: This can be used in case you need both an inbound channel and an outbound channel, as it extends the Source and Sink interfaces. Because order-service sends messages, as well as receives them, its main class has been annotated with @EnableBinding(Processor.class).

Here’s the main class of order-service that enables Spring Cloud Stream binding.

@SpringBootApplication
@EnableBinding(Processor.class)
public class OrderApplication {
  public static void main(String[] args) {
    new SpringApplicationBuilder(OrderApplication.class).web(true).run(args);
  }
}

Adding message broker

In Spring Cloud Stream nomenclature the implementation responsible for integration with the specific message broker is called binder. By default, Spring Cloud Stream provides binder implementations for Kafka and RabbitMQ. It is able to automatically detect and use a binder found on the classpath. Any middleware-specific settings can be overridden through external configuration properties in the form supported by Spring Boot, such as application arguments, environment variables, or just the application.yml file. To include support for RabbitMQ you should add the following dependency to the project.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

Now, our applications need to connect with one, shared instance of RabbitMQ broker. That’s why I run Docker image with RabbitMQ exposed outside on default 5672 port. It also launches a web dashboard available under address http://192.168.99.100:15672.

$ docker run -d --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

We need to override the default address of RabbitMQ for every Spring Boot application by setting property spring.rabbitmq.host to Docker machine IP 192.168.99.100.

spring:  
  rabbitmq:
    host: 192.168.99.100
    port: 5672

Implementing message-driven microservices

Spring Cloud Stream is built on top of the Spring Integration project. Spring Integration extends the Spring programming model to support the well-known Enterprise Integration Patterns (EIP). EIP defines a number of components that are typically used for orchestration in distributed systems. You have probably heard about patterns such as message channels, routers, aggregators, or endpoints. Let’s proceed to the implementation.
We begin from order-service, which is responsible for accepting orders, publishing them on shared topics, and then collecting asynchronous responses from downstream services. Here’s the @Service, which builds a message and publishes it to the remote topic using Source bean.

@Service
public class OrderSender {
  @Autowired
  private Source source;
  
  public boolean send(Order order) {
    return this.source.output().send(MessageBuilder.withPayload(order).build());
  }
}

That @Service is called by the controller, which exposes the HTTP endpoints for submitting new orders and getting order with status by id.

@RestController
public class OrderController {

   private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);
   
   private ObjectMapper mapper = new ObjectMapper();
   
   @Autowired
   OrderRepository repository;
   @Autowired
   OrderSender sender;   
   
   @PostMapping
   public Order process(@RequestBody Order order) throws JsonProcessingException {
      Order o = repository.add(order);
      LOGGER.info("Order saved: {}", mapper.writeValueAsString(order));
      boolean isSent = sender.send(o);
      LOGGER.info("Order sent: {}", mapper.writeValueAsString(Collections.singletonMap("isSent", isSent)));
      return o;
   }
   
   @GetMapping("/{id}")
   public Order findById(@PathVariable("id") Long id) {
      return repository.findById(id);
   }
   
}

Now, let’s take a closer look at the consumer side. The message sent by OrderSender bean from order-service is received by account-service and product-service. To receive the message from topic exchange, we just have to annotate the method that takes the Order object as a parameter with @StreamListener. We also have to define a target channel for the listener – in that case it is Processor.INPUT.

@SpringBootApplication
@EnableBinding(Processor.class)
public class OrderApplication {
   
   private static final Logger LOGGER = LoggerFactory.getLogger(OrderApplication.class);
   
   @Autowired
   OrderService service;
   
   public static void main(String[] args) {
      new SpringApplicationBuilder(OrderApplication.class).web(true).run(args);
   }
   
   @StreamListener(Processor.INPUT)
   public void receiveOrder(Order order) throws JsonProcessingException {
      LOGGER.info("Order received: {}", mapper.writeValueAsString(order));
      service.process(order);
   }
   
}

Received order is then processed by AccountService bean. Order may be accepted or rejected by account-service dependending on sufficient funds on customer’s account for order’s realization. The response with acceptance status is sent back to order-service via output channel invoked by the OrderSender bean.

@Service
public class AccountService {

   private static final Logger LOGGER = LoggerFactory.getLogger(AccountService.class);
   
   private ObjectMapper mapper = new ObjectMapper();
   
   @Autowired
   AccountRepository accountRepository;
   @Autowired
   OrderSender orderSender;
   
   public void process(final Order order) throws JsonProcessingException {
      LOGGER.info("Order processed: {}", mapper.writeValueAsString(order));
      List accounts =  accountRepository.findByCustomer(order.getCustomerId());
      Account account = accounts.get(0);
      LOGGER.info("Account found: {}", mapper.writeValueAsString(account));
      if (order.getPrice() <= account.getBalance()) {
         order.setStatus(OrderStatus.ACCEPTED);
         account.setBalance(account.getBalance() - order.getPrice());
      } else {
         order.setStatus(OrderStatus.REJECTED);
      }
      orderSender.send(order);
      LOGGER.info("Order response sent: {}", mapper.writeValueAsString(order));
   }
   
}

The last step is configuration. It is provided inside application.yml file. We have to properly define destinations for channels. While order-service is assigning orders-out destination to output channel, and orders-in destination to input channel, account-service and product-service do the opposite. It is logical, because messages sent by order-service via its output destination are received by consuming services via their input destinations. But it is still the same destination on a shared broker's exchange. Here are configuration settings of order-service.

spring: 
  cloud:  
    stream:
      bindings:
        output:
          destination: orders-out
        input:
          destination: orders-in
      rabbit:
        bindings:
          input:
            consumer:
              exchangeType: direct

Here's configuration provided for account-service and product-service.

spring:  
  cloud:  
    stream:
      bindings:
        output:
          destination: orders-in
        input:
          destination: orders-out
      rabbit:
        bindings:
          output:
            producer:
              exchangeType: direct
              routingKeyExpression: '"#"'

Finally, you can run our sample microservice. For now, we just need to run a single instance of each microservice. You can easily generate some test requests by running JUnit test class OrderControllerTest provided in my source code repository inside module order-service. This case is simple. In the next, we will study more advanced samples with multiple running instances of consuming services.

Scaling up

To scale up our Spring Cloud Stream applications we just need to launch additional instances of each microservice. They will still listen for the incoming messages on the same topic exchange as the currently running instances. After adding one instance of account-service and product-service we may send a test order. The result of that test won't be satisfactory for us... Why? A single order is received by all the running instances of every microservice. This is exactly how topic exchanges work - the message sent to the topic is received by all consumers, which are listening on that topic. Fortunately, Spring Cloud Stream is able to solve that problem by providing a solution called consumer group. It is responsible for guaranteeing that only one of the instances is expected to handle a given message if they are placed in a competing consumer relationship. The transformation to consumer group mechanism when running multiple instances of the service has been visualized on the following figure.

stream-2

Configuration of a consumer group mechanism is not very difficult. We just have to set a group parameter with the name of the group for the given destination. Here's the current binding configuration for account-service. The orders-in destination is a queue created for direct communication with order-service, so only orders-out is grouped using spring.cloud.stream.bindings..group property.

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: orders-in
        input:
          destination: orders-out
          group: account

Consumer group mechanisms is a concept taken from Apache Kafka, and implemented in Spring Cloud Stream also for RabbitMQ broker, which does not natively support it. So, I think it is pretty interesting how it is configured on RabbitMQ. If you run two instances of the service without setting group name on destination there are two bindings created for a single exchange (one binding per one instance) as shown in the picture below. Because two applications are listening on that exchange, there are four bindings assigned to that exchange in total.

spring-cloud-stream-testing-3

If you set a group name for the selected destination Spring Cloud Stream will create a single binding for all running instances of a given service. The name of the binding will be suffixed with the group name.

spring-cloud-stream-testing_11_06

Because, we have included spring-cloud-starter-sleuth to the project dependencies the same traceId header is sent between all the asynchronous requests exchanged during realization of single request incoming to the order-service POST endpoint. Thanks to that we can easily correlate all logs using this header using Elastic Stack (Kibana).

spring-cloud-stream-testing_11_05

Automated Testing with Spring Cloud Stream

You can easily test your microservice without connecting to a message broker. To achieve it you need to include spring-cloud-stream-test-support to your project dependencies. It contains the TestSupportBinder bean that lets you interact with the bound channels and inspect any messages sent and received by the application.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-test-support</artifactId>
  <scope>test</scope>
</dependency>

In the test class we need to declare MessageCollector bean, which is responsible for receiving messages retained by TestSupportBinder. Here's my test class from account-service. Using Processor bean I send test order to input channel. Then MessageCollector receives a message that is sent back to order-service via the output channel. Test method testAccepted creates order that should be accepted by account-service, while testRejected method sets too high an order price that results in rejecting the order.

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class OrderReceiverTest {

   private static final Logger LOGGER = LoggerFactory.getLogger(OrderReceiverTest.class);
   
   @Autowired
   private Processor processor;
   @Autowired
   private MessageCollector messageCollector;

   @Test
   @SuppressWarnings("unchecked")
   public void testAccepted() {
      Order o = new Order();
      o.setId(1L);
      o.setAccountId(1L);
      o.setCustomerId(1L);
      o.setPrice(500);
      o.setProductIds(Collections.singletonList(2L));
      processor.input().send(MessageBuilder.withPayload(o).build());
      Message received = (Message) messageCollector.forChannel(processor.output()).poll();
      LOGGER.info("Order response received: {}", received.getPayload());
      assertNotNull(received.getPayload());
      assertEquals(OrderStatus.ACCEPTED, received.getPayload().getStatus());
   }
   
   @Test
   @SuppressWarnings("unchecked")
   public void testRejected() {
      Order o = new Order();
      o.setId(1L);
      o.setAccountId(1L);
      o.setCustomerId(1L);
      o.setPrice(100000);
      o.setProductIds(Collections.singletonList(2L));
      processor.input().send(MessageBuilder.withPayload(o).build());
      Message received = (Message) messageCollector.forChannel(processor.output()).poll();
      LOGGER.info("Order response received: {}", received.getPayload());
      assertNotNull(received.getPayload());
      assertEquals(OrderStatus.REJECTED, received.getPayload().getStatus());
   }

}

Conclusion

Message-driven microservices are a good choice whenever you don't need an asynchronous response from your API. In this article, I have shown a sample use case of the publish/subscribe model in inter-service communication between your microservices. The source code is as usual available on GitHub (https://github.com/piomin/sample-message-driven-microservices.git). For other interesting examples of testing with Spring Cloud Stream library, also with Apache Kafka, you can refer to Chapter 11 in my book Mastering Spring Cloud (https://www.packtpub.com/application-development/mastering-spring-cloud).

The post Building and testing message-driven microservices using Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/feed/ 7 6678
RabbitMQ in cluster https://piotrminkowski.com/2017/02/28/rabbitmq-in-cluster/ https://piotrminkowski.com/2017/02/28/rabbitmq-in-cluster/#comments Tue, 28 Feb 2017 23:23:53 +0000 https://piotrminkowski.wordpress.com/?p=1190 RabbitMQ grown into the most popular message broker software. It is written in Erlang and implements Advanced Message Queueing Protocol (AMQP). It is easy to use and configure even if we are talking about such mechanisms as clustering or high availability. In this post, I’m going to show you how to run some instances of […]

The post RabbitMQ in cluster appeared first on Piotr's TechBlog.

]]>
RabbitMQ grown into the most popular message broker software. It is written in Erlang and implements Advanced Message Queueing Protocol (AMQP). It is easy to use and configure even if we are talking about such mechanisms as clustering or high availability. In this post, I’m going to show you how to run some instances of RabbitMQ provided in docker containers in the cluster with highly available (HA) queues. Based on the sample Java application we’ll see how to send and receive messages from the RabbitMQ cluster and check how this message broker handles a large number of incoming messages. Sample Spring Boot application is available on GitHub. Here is a picture illustrating the architecture of our solution.

rabbitmq-cluster

We use an official Docker repository of RabbitMQ. Here are commands for running three RabbitMQ nodes. The first node is the master of the cluster – two other nodes will join him. We use container management to enable an UI administration console for each node. Every node has a default connection and UI management ports exposed. Important thing is to link rabbit2 and rabbit3 constainers to rabbit1, which is necessary while joining to cluster mastering by rabbit1.

$ docker run -d --hostname rabbit1 --name rabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcluster' -p 30000:5672 -p 30001:15672 rabbitmq:management
 
$ docker run -d --hostname rabbit2 --name rabbit2 --link rabbit1:rabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcluster' -p 30002:5672 -p 30003:15672 rabbitmq:management
 
$ docker run -d --hostname rabbit3 --name rabbit3 --link rabbit1:rabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcluster' -p 30004:5672 -p 30005:15672 rabbitmq:management
 

Ok, now there are three RabbitMQ running instances. We can go to the UI management console for all of those instances available as docker containers, for example http://192.168.99.100:30001 (rabbitmq). Each instance is available on its independent cluster like we see in the pictures below. We would like to make all instances working in the same cluster rabbit@rabbit1.

rabbit_cluster

rabbit_cluster2

Here’s set of commands run on rabbit2 instance for joining cluster rabbit@rabbit1. The same set should be run on rabbit3 node. In the beginning we have to connect to docker container and run bash command. Before running rabbitmq join_cluster command we have to stop broker.

$ docker exec -i -t rabbit2 \bash
root@rabbit2:/# rabbitmqctl stop_app
Stopping node rabbit@rabbit2 ...
root@rabbit2:/# rabbitmqctl join_cluster rabbit@rabbit1
Clustering node rabbit@rabbit2 with rabbit@rabbit1 ...
root@rabbit2:/# rabbitmqctl start_app
Starting node rabbit@rabbit2 ...
 

If everything was successful we should see cluster name rabbit@rabbit1 in upper right corner of rabbit2 management console. You should also see list of running nodes in the Nodes section. You can also check cluster status by running on every node command rabbitmqctl cluster_status, which should also display list of all cluster nodes.

rabbit_cluster3

After starting all nodes go to UI managent console on one of nodes. Now we are going to configure High Availibility for selected queue. It is not important which node you choose, because they are in one cluster. In the Queues tab create queue with name q.example. Then go to Admin tab and select Policies section and create new policy. In the picture below you can see policy I have created. I selected ha-mode=all which means that is mirrored across all nodes in the cluster and when a new node is added to the cluster, the queue will be mirrored to that node. There are also available exactly, nodes modes – more about RabbitMQ high availability you can find here. In pattern field enter your queue name and in apply to select Queues. If everything was succeeded you should see ha-all feature in queue row.

rabbit_cluster5.png

One of the greatest advantages of RabbitMQ is monitoring. You can see many statistics like memory, disk usage, I/O statistics, detailed message rates, graphs, etc. Some of them you could see below.

rabbit_cluster6

rabbit_cluster7

RabbitMQ has great support in the Spring framework. There many projects in which use RabbitMQ implementation by default, for example, Spring Cloud Stream, Spring Cloud Sleuth. I’m going to show you a sample Spring Boot application that sends messages to RabbitMQ cluster and receives them from the HA queue. The application source code is available on GitHub. Here’s the main class of application. We enable RabbitMQ listener by declaring @EnableRabbit on class and @RabbitListener on the receiving method. We also have to declare listened queue, broker connection factory, and listener container factory to allow listener concurrency. Inside CachingConnectionFactory we set all three addresses of RabbitMQ cluster instances: 192.168.99.100:30000, 192.168.99.100:30002, 192.168.99.100:30004.

@SpringBootApplication
@EnableRabbit
public class Listener {

   private static Logger logger = Logger.getLogger("Listener");

   public static void main(String[] args) {
      SpringApplication.run(Listener.class, args);
   }

   @RabbitListener(queues = "q.example")
   public void onMessage(Order order) {
      logger.info(order.toString());
   }

   @Bean
   public ConnectionFactory connectionFactory() {
      CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
      connectionFactory.setUsername("guest");
      connectionFactory.setPassword("guest");
      connectionFactory.setAddresses("192.168.99.100:30000,192.168.99.100:30002,192.168.99.100:30004");
      connectionFactory.setChannelCacheSize(10);
      return connectionFactory;
   }

   @Bean
   public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(connectionFactory());
      factory.setConcurrentConsumers(10);
      factory.setMaxConcurrentConsumers(20);
      return factory;
   }

   @Bean
   public Queue queue() {
      return new Queue("q.example");
   }

}
 

Conclusion

Clustering and high availability configuration with RabbitMQ is pretty simple. I like Rabbit MQ for support in the cluster monitoring process with UI management console. In my opinion, it is user friendly and intuitive. In the sample application, I send 100k messages into the sample queue. Using 20 concurrent consumers they were processed 65 seconds (~80/s per consumer thread) and memory usage at its peak was about 400MB on each node. Of course, our application is just receiving an object message and logging it in the console.

The post RabbitMQ in cluster appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2017/02/28/rabbitmq-in-cluster/feed/ 10 1190
How to ship logs with Logstash, Elasticsearch and RabbitMQ https://piotrminkowski.com/2017/02/03/how-to-ship-logs-with-logstash-elasticsearch-and-rabbitmq/ https://piotrminkowski.com/2017/02/03/how-to-ship-logs-with-logstash-elasticsearch-and-rabbitmq/#respond Fri, 03 Feb 2017 09:01:18 +0000 https://piotrminkowski.wordpress.com/?p=18 The architecture of our solution for shipping application logs to Logstash is visible in the picture below. We’ll start from sample Spring Boot application shipping logs to RabbitMQ exchange. Then using Docker, we’ll configure the environment containing RabbitMQ, Logstash, Elasticsearch, and Kibana – each running on separated Docker container. My sample Java application is available […]

The post How to ship logs with Logstash, Elasticsearch and RabbitMQ appeared first on Piotr's TechBlog.

]]>
The architecture of our solution for shipping application logs to Logstash is visible in the picture below. We’ll start from sample Spring Boot application shipping logs to RabbitMQ exchange. Then using Docker, we’ll configure the environment containing RabbitMQ, Logstash, Elasticsearch, and Kibana – each running on separated Docker container.

sscg9hyasgmdht1k46653

My sample Java application is available on https://github.com/piomin/sample-amqp-logging.git.
There are only two Spring Boot dependencies needed inside pom.xml. First for REST controller and second for AMQP dependencies.

<dependencies>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-rest</artifactId>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
   </dependency>
</dependencies>

Here’s a simple controller with one logging message.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class Controller {

   protected Logger logger = LoggerFactory.getLogger(Controller.class.getName());

   @RequestMapping("/hello/{param}")
   public String hello(@PathVariable("param") String param) {
      logger.info("Controller.hello(" + param + ")");
      return "Hello";
   }

}

I use logback as logger implementation and Spring AMQP appender for sending logs to RabbitMQ over AMQP protocol.

<appender name="AMQP" class="org.springframework.amqp.rabbit.logback.AmqpAppender">
   <layout>
      <pattern>
         {
            "time": "%date{ISO8601}",
            "thread": "%thread",
            "level": "%level",
            "class": "%logger{36}",
            "message": "%message"
         }
      </pattern>
   </layout>

   <!-- RabbitMQ connection -->
   <host>192.168.99.100</host>
   <port>30000</port>
   <username>guest</username>
   <password>guest</password>

   <applicationId>api-service-4</applicationId>
   <routingKeyPattern>api-service-4</routingKeyPattern>
   <declareExchange>true</declareExchange>
   <exchangeType>direct</exchangeType>
   <exchangeName>ex_logstash</exchangeName>

   <generateId>true</generateId>
   <charset>UTF-8</charset>
   <durable>true</durable>
   <deliveryMode>PERSISTENT</deliveryMode>
</appender>

I run RabbitMQ server using docker image https://hub.docker.com/_/rabbitmq/. Here’s docker command for it. I choosed rabbitmq:management docker image to enable expose of RabbitMQ UI management console on port 30001. After running this command we can go to management console available on 192.168.99.100:30001. There we have to create queue named q_logstash and direct exchange named ex_logstach having routing set to q_logstash queue.

$ docker run -d -it --name rabbit --hostname rabbit -p 30000:5672 -p 30001:15672 rabbitmq:management
rabbit
RabbitMQ management console with exchange and queue binding

Then we run Elasticsearch and Kibana docker images. Kibana container need to be linked to elasticsearch.

$ docker run -d -it --name es -p 9200:9200 -p 9300:9300 elasticsearch
$ docker run -d -it --name kibana --link es:elasticsearch -p 5601:5601 kibana

Finally we can run Logstash docker image which get RabbitMQ queue as input and set Elasticsearch api as output. We have to change host to docker machine default address and port configured when running RabbitMQ container. Also we have durable queue so it has to be changed because default value for that is false following this reference:https://www.elastic.co/guide/en/logstash/current/plugins-inputs-rabbitmq.html

$ docker run -d -it --name logstash logstash -e 'input { rabbitmq {
host => "192.168.99.100" port => 30000 durable => true } }
output { elasticsearch { hosts => ["192.168.99.100"] } }'

After running all docker containers for RabbitMQ, Logstash, Elasticsearch and Kibana we can run our sample Spring Boot application and see logs on Kibana available on http://192.168.99.100:5601.

The post How to ship logs with Logstash, Elasticsearch and RabbitMQ appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2017/02/03/how-to-ship-logs-with-logstash-elasticsearch-and-rabbitmq/feed/ 0 18