message broker Archives - Piotr's TechBlog https://piotrminkowski.com/tag/message-broker/ Java, Spring, Kotlin, microservices, Kubernetes, containers Wed, 31 Mar 2021 12:55:32 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 message broker Archives - Piotr's TechBlog https://piotrminkowski.com/tag/message-broker/ 32 32 181738725 Knative Eventing with Kafka and Quarkus https://piotrminkowski.com/2021/03/31/knative-eventing-with-kafka-and-quarkus/ https://piotrminkowski.com/2021/03/31/knative-eventing-with-kafka-and-quarkus/#respond Wed, 31 Mar 2021 12:55:26 +0000 https://piotrminkowski.com/?p=9620 In this article, you will learn how to run eventing applications on Knative using Kafka and Quarkus. Previously I described the same approach for Kafka and Spring Cloud. If you want to compare both of them read my article Knative Eventing with Kafka and Spring Cloud. We will deploy exactly the same architecture. However, instead […]

The post Knative Eventing with Kafka and Quarkus appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to run eventing applications on Knative using Kafka and Quarkus. Previously I described the same approach for Kafka and Spring Cloud. If you want to compare both of them read my article Knative Eventing with Kafka and Spring Cloud. We will deploy exactly the same architecture. However, instead of Spring Cloud Functions we will use Quarkus Funqy. Also, Spring Cloud Stream may be replaced with Quarkus Kafka. Before we start, let’s clarify some things.

Concept over Knative and Quarkus

Quarkus supports Knative in several ways. First of all, we may use the Quarkus Kubernetes module to simplify deployment on Knative. We can also use the Quarkus Funqy Knative Event extension to route and process cloud events within functions. That’s not all. Quarkus supports a serverless functional style. With the Quarkus Funqy module, we can write functions deployable to various FaaS (including Knative). These functions can be invoked through HTTP. Finally, we may integrate our application with Kafka topics using annotations from the Quarkus Kafka extension.

The Quarkus Funqy Knative Event module bases on the Knative broker and triggers. Since we will use Kafka Source instead of broker and trigger we won’t include that module. However, we can still take advantage of Quarkus Funqy and HTTP binding.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

As I mentioned before, we will the same architecture and scenario as in my previous article about Knative eventing. Let’s briefly describe it.

Today we will implement an eventual consistency pattern (also known as a SAGA pattern). How it works? The sample system consists of three services. The order-service creates a new order that is related to the customers and products. That order is sent to the Kafka topic. Then, our two other applications customer-service and product-service receive the order event. After that, they perform a reservation. The customer-service reserves an order’s amount on the customer’s account. Meanwhile the product-service reserves a number of products specified in the order. Both these services send a response to the order-service through the Kafka topic. If the order-service receives positive reservations from both services it confirms the order. Then, it sends an event with that information. Both customer-service and product-service receive the event and confirm reservations. You can verify it in the picture below.

quarkus-knative-eventing-arch

Prerequisites

There are several requirements we need to comply before start. I described them all in my previous article about Knative Eventing. Here’s just a brief remind:

  1. Kubernetes cluster with at least 1.17 version. I’m using a local cluster. If you use a remote cluster replace dev.local in image name into your Docker account name
  2. Install Knative Serving and Eventing on your cluster. You may find the detailed installation instructions here.
  3. Install Kafka Eventing Broker. Here’s the link to the releases site. You don’t need everything – we will use the KafkaSource and KafkaBinding CRDs
  4. Install Kafka cluster with the Strimzi operator. I installed it in the kafka namespace. The name of my cluster is my-cluster.

Step 1. Installing Kafka Knative components

Assuming you have already installed all the required elements to run Knative Eventing on your Kubernetes cluster, we may create some components dedicated to applications. You may find YAML manifests with object declarations in the k8s directory inside every single application directory. Firstly, let’s create a KafkaBinding. It is responsible for injecting the address of the Kafka cluster into the application container. Thanks to KafkaBinding that address is visible inside the container as the KAFKA_BOOTSTRAP_SERVERS environment variable. Here’s an example of the YAML declaration for the customer-saga application. We should create similar objects for two other applications.

apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
  name: kafka-binding-customer-saga
spec:
  subject:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: customer-saga
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092

In the next step, we create the KafkaSource object. It reads events from the particular topic and passes them to the consumer. It calls the HTTP POST endpoint exposed by the application. We can override a default context path of the HTTP endpoint. For the customer-saga the target URL is /reserve. It should receive events from the order-events topic. Because both customer-saga and product-saga listen for events from the order-events topic we need to create a similar KafkaSource object also for product-saga.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-orders-customer
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - order-events
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /reserve

On the other hand, the order-saga listens for events on the reserve-events topic. If you want to verify our scenario once again please refer to the diagram in the Source Code section. This time the target URL is /confirm.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-orders-confirm
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - reserve-events
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: order-saga
    uri: /confirm

Let’s verify a list of Kafka sources. In our case there is a single KafkaSource per application. Before deploying our Quarkus application on Knative your Kafka source won’t be ready.

quarkus-knative-eventing-kafkasource

Step 2. Integrating Quarkus with Kafka

In order to integrate Quarkus with Apache Kafka, we may use the SmallRye Reactive Messaging library. Thanks to that we may define an input and output topic for each method using annotations. The messages are serialized to JSON. We can also automatically expose Kafka connection status in health check. Here’s the list of dependencies we need to include in Maven pom.xml.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-jackson</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-health</artifactId>
</dependency>

Before we start with the source code, we need to provide some configuration settings in the application.properties file. Of course, Kafka requires a connection URL to the cluster. We use the environment variable injected by the KafkaBinding object. Also, the output topic name should be configured. Here’s a list of required properties for the order-saga application.

kafka.bootstrap.servers = ${KAFKA_BOOTSTRAP_SERVERS}

mp.messaging.outgoing.order-events.connector = smallrye-kafka
mp.messaging.outgoing.order-events.topic = order-events
mp.messaging.outgoing.order-events.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Finally, we may switch to the code. Let’s start with order-saga. It will continuously send orders to the order-events topic. Those events are received by both customer-saga and product-saga applications. The method responsible for generating and sending events returns reactive stream using Mutiny Multi. It sends an event every second. We need to annotate the method with the @Outgoing annotation passing the name of output defined in application properties. Also, @Broadcast annotation Indicates that the event is dispatched to all subscribers. Before sending, every order needs to be persisted in a database (we use H2 in-memory database).

@ApplicationScoped
@Slf4j
public class OrderPublisher {

   private static int num = 0;

   @Inject
   private OrderRepository repository;
   @Inject
   private UserTransaction transaction;

   @Outgoing("order-events")
   @Broadcast
   public Multi<Order> orderEventsPublish() {
      return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
           .map(tick -> {
              Order o = new Order(++num, num%10+1, num%10+1, 100, 1, OrderStatus.NEW);
              try {
                 transaction.begin();
                 repository.persist(o);
                 transaction.commit();
              } catch (Exception e) {
                 log.error("Error in transaction", e);
              }

              log.info("Order published: {}", o);
              return o;
           });
   }

}

Step 3. Handling Knative events with Quarkus Funqy

Ok, in the previous step we have already implemented a part of the code responsible for sending events to the Kafka topic. We also have KafkaSource that is responsible for dispatching events from the Kafka topic into the application HTTP endpoint. Now, we just need to handle them. It is very simple with Quarkus Funqy. It allows us to create functions according to the serverless Faas approach. But we can also easily bound each function to the HTTP endpoint with the Quarkus Funqy HTTP extension. Let’s include it in our dependencies.

 <dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-funqy-http</artifactId>
 </dependency>

In order to create a function with Quarkus Funqy, we just need to annotate the particular method with @Funq. The name of the method is reserve, so it is automatically bound to the HTTP endpoint POST /reserve. It takes a single input parameter, which represents incoming order. It is automatically deserialized from JSON.

In the fragment of code visible below, we implement order handling in the customer-saga application. Once it receives an order, it performs a reservation on the customer account. Then it needs to send a response to the order-saga. To do that we may use Quarkus reactive messaging support once again. We define the Emitter object that allows us to send a single event into the topic. We may use inside a method that does not return any output that should be sent to a topic (with @Outgoing). The Emitter bean should be annotated with @Channel. It works similar to @Outgoing. We also need to define an output topic related to the name of the channel.

@Slf4j
public class OrderReserveFunction {

   @Inject
   private CustomerRepository repository;
   @Inject
   @Channel("reserve-events")
   Emitter<Order> orderEmitter;

   @Funq
   public void reserve(Order order) {
      log.info("Received order: {}", order);
      doReserve(order);
   }

   private void doReserve(Order order) {
      Customer customer = repository.findById(order.getCustomerId());
      log.info("Customer: {}", customer);
      if (order.getStatus() == OrderStatus.NEW) {
         customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
         customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
         order.setStatus(OrderStatus.IN_PROGRESS);
         log.info("Order reserved: {}", order);
         orderEmitter.send(order);
      } else if (order.getStatus() == OrderStatus.CONFIRMED) {
         customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
      }
      repository.persist(customer);
   }
}

Here are configuration properties for integration between Kafka and Emitter. The same configuration properties should be created for both customer-saga and product-saga.

kafka.bootstrap.servers = ${KAFKA_BOOTSTRAP_SERVERS}

mp.messaging.outgoing.reserve-events.connector = smallrye-kafka
mp.messaging.outgoing.reserve-events.topic = reserve-events
mp.messaging.outgoing.reserve-events.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Finally, let’s take a look at the implementation of the Quarkus function inside the product-saga application. It also sends a response to the reserve-events topic using the Emitter object. It handles incoming orders and performs a reservation for the requested number of products.

@Slf4j
public class OrderReserveFunction {

   @Inject
   private ProductRepository repository;

   @Inject
   @Channel("reserve-events")
   Emitter<Order> orderEmitter;

   @Funq
   public void reserve(Order order) {
      log.info("Received order: {}", order);
      doReserve(order);
   }

   private void doReserve(Order order) {
      Product product = repository.findById(order.getProductId());
      log.info("Product: {}", product);
      if (order.getStatus() == OrderStatus.NEW) {
         product.setReservedItems(product.getReservedItems() + order.getProductsCount());
         product.setAvailableItems(product.getAvailableItems() - order.getProductsCount());
         order.setStatus(OrderStatus.IN_PROGRESS);
         orderEmitter.send(order);
      } else if (order.getStatus() == OrderStatus.CONFIRMED) {
         product.setReservedItems(product.getReservedItems() - order.getProductsCount());
      }
      repository.persist(product);
   }
}

Step 4. Deploy Quarkus application on Knative

Finally, we can deploy all our applications on Knative. To simplify that process we may use Quarkus Kubernetes support. It is able to automatically generate deployment manifests based on the source code and application properties. Quarkus also supports building images with Jib. So first, let’s add the following dependencies to Maven pom.xml.

 <dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-kubernetes</artifactId>
 </dependency>
 <dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-container-image-jib</artifactId>
 </dependency>

In the next step, we need to add some configuration settings to the application.properties file. To enable automatic deployment on Kubernetes the property quarkus.kubernetes.deploy must be set to true. Then we should change the target platform into Knative. Thanks to that Quarkus will generate Knative Service instead of a standard Kubernetes Deployment. The last property quarkus.container-image.group is responsible for setting the name of the image owner group. For local development with Knative, we should set the dev.local value there.

quarkus.kubernetes.deploy = true
quarkus.kubernetes.deployment-target = knative
quarkus.container-image.group = dev.local

After setting all the values visible above we just need to execute Maven build to deploy the application.

$ mvn clean package

After running Maven build for all the applications let’s verify a list of Knative Services.

Once the order-saga application starts it begins sending orders continuously. It also receives order events sent by customer-saga and product-saga. Those events are processed by the Quarkus function. Here are the logs printed by order-saga.

Final Thoughts

As you see, we can easily implement and deploy Quarkus applications on Knative. Quarkus provides several extensions that simplify integration with the Knative Eventing model and Kafka broker. We can use Quarkus Funqy to implement the serverless FaaS approach or SmallRye Reactive Messaging to integrate with Apache Kafka. You can compare that Quarkus support with Spring Boot in my previous article: Knative Eventing with Kafka and Spring Cloud.

The post Knative Eventing with Kafka and Quarkus appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/03/31/knative-eventing-with-kafka-and-quarkus/feed/ 0 9620
RabbitMQ Monitoring on Kubernetes https://piotrminkowski.com/2020/09/29/rabbitmq-monitoring-on-kubernetes/ https://piotrminkowski.com/2020/09/29/rabbitmq-monitoring-on-kubernetes/#comments Tue, 29 Sep 2020 10:55:02 +0000 https://piotrminkowski.com/?p=8883 RabbitMQ monitoring can be a key point of your system management. Therefore we should use the right tools for that. To enable them on RabbitMQ we need to install some plugins. In this article, I will show you how to use Prometheus and Grafana to monitor the key metrics of RabbitMQ. Of course, we will […]

The post RabbitMQ Monitoring on Kubernetes appeared first on Piotr's TechBlog.

]]>
RabbitMQ monitoring can be a key point of your system management. Therefore we should use the right tools for that. To enable them on RabbitMQ we need to install some plugins. In this article, I will show you how to use Prometheus and Grafana to monitor the key metrics of RabbitMQ. Of course, we will build the applications that send and receive messages. We will use Kubernetes as a target platform for our system. In the last step, we are going to enable the tracing plugin. It helps us in collecting the list of incoming messages.

Source code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my repository sample-spring-amqp. Inside k8s directory, you will find all the required deployment manifests. Both Spring Boot test applications are available inside listener and producer directories. If you would like to test the cluster of RabbitMQ please refer to the article RabbitMQ in cluster.

Step 1 – Building a RabbitMQ image

In the first step, we are overriding the Docker image of RabbitMQ. In that case, we need to extend the base image with a tag 3-management and add two plugins. The plugin rabbitmq_prometheus adds Prometheus exporter of core RabbitMQ metrics. The second of them, rabbitmq_tracing allows us to log the payloads of incoming messages. That’s all that we need to define in our Dockerfile, which is visible below.

FROM rabbitmq:3-management
RUN rabbitmq-plugins enable --offline rabbitmq_prometheus rabbitmq_tracing

Then you just need to build an already defined image. Let’s say its name is piomin/rabbitmq-monitoring. After building I’m pushing it to my remote Docker registry.

$ docker build -t piomin/rabbitmq-monitoring .
$ docker push piomin/rabbitmq-monitoring

Step 2 – Deploying RabbitMQ on Kubernetes

Now, we are going to deploy our custom image of RabbitMQ on Kubernetes. For the purpose of this article, we will run a standalone version of RabbitMQ. In that case, the only thing we need to do is to override some configuration properties in the rabbitmq.conf and enabled_plugins files. First, I’m enabling logging to the console at the debug level. Then I’m also enabling all the required plugins.

apiVersion: v1
kind: ConfigMap
metadata:
  name: rabbitmq
  labels:
    name: rabbitmq
data:
  rabbitmq.conf: |-
    loopback_users.guest = false
    log.console = true
    log.console.level = debug
    log.exchange = true
    log.exchange.level = debug
  enabled_plugins: |-
    [rabbitmq_management,rabbitmq_prometheus,rabbitmq_tracing].

Both rabbitmq.conf and enabled_plugins files should be placed inside the /etc/rabbitmq directory. Therefore, I’m mounting them inside the volume assigned to the RabbitMQ Deployment. Additionally, we are exposing three ports outside the container. The port 5672 is used in communication with applications through AMQP protocol. The Prometheus plugin exposes metrics on the dedicated port 15692. In order to access Management UI and HTTP endpoints, you should use the 15672 port.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: rabbitmq
  labels:
    app: rabbitmq
spec:
  replicas: 1
  selector:
    matchLabels:
      app: rabbitmq
  template:
    metadata:
      labels:
        app: rabbitmq
    spec:
      containers:
        - name: rabbitmq
          image: piomin/rabbitmq-monitoring:latest
          ports:
            - containerPort: 15672
              name: http
            - containerPort: 5672
              name: amqp
            - containerPort: 15692
              name: prometheus
          volumeMounts:
            - name: rabbitmq-config-map
              mountPath: /etc/rabbitmq/
      volumes:
        - name: rabbitmq-config-map
          configMap:
            name: rabbitmq

Step 3 – Building Spring Boot listener application

Our sample listener application uses Spring Boot AMQP project for integration with RabbitMQ. Thanks to Spring Boot Actuator module it is also exposing metrics including RabbiMQ specific values. It is important to expose them in the format readable by Prometheus.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
   <groupId>io.micrometer</groupId>
   <artifactId>micrometer-registry-prometheus</artifactId>
</dependency>

The listener application defines and creates two exchanges. First of them, trx-events-topic, is used for multicast communication. On the other hand, trx-events-direct takes a part in point-to-point communication. Both our sample applications are exchanging messages in JSON format. Therefore we have to override a default Spring Boot AMQP message converter with Jackson2JsonMessageConverter.

@SpringBootApplication
public class ListenerApplication {

   public static void main(String[] args) {
      SpringApplication.run(ListenerApplication.class, args);
   }

   @Bean
   public TopicExchange topic() {
      return new TopicExchange("trx-events-topic");
   }

   @Bean
   public DirectExchange queue() {
      return new DirectExchange("trx-events-direct");
   }

   @Bean
   public MessageConverter jsonMessageConverter() {
      return new Jackson2JsonMessageConverter();
   }
}

The listener application receives messages from the both topic and direct exchanges. Each running instance of this application is creating a queue binding with the random name. With a direct exchange, only a single queue is receiving incoming messages. On the other hand, all the queues related to a topic exchange are receiving incoming messages.

@Component
@Slf4j
public class ListenerComponent {

   @RabbitListener(bindings = {
      @QueueBinding(
         exchange = @Exchange(type = ExchangeTypes.TOPIC, name = "trx-events-topic"),
         value = @Queue("${topic.queue.name}")
      )
   })
   public void onTopicMessage(SampleMessage message) {
      log.info("Message received: {}", message);
   }

   @RabbitListener(bindings = {
      @QueueBinding(
         exchange = @Exchange(type = ExchangeTypes.DIRECT, name = "trx-events-direct"),
         value = @Queue("${direct.queue.name}")
      )
   })
   public void onDirectMessage(SampleMessage message) {
      log.info("Message received: {}", message);
   }

}

The name of queues assigned to the topic and direct exchanges is configured inside application.yml file.

topic.queue.name: t-${random.uuid}
direct.queue.name: d-${random.uuid}

Step 4 – Building Spring Boot producer application

The producer application also uses Spring Boot AMQP for integration with RabbitMQ. It sends messages to the exchanges with RabbitTemplate. Similarly to the listener application it formats all the messages as JSON string.

@SpringBootApplication
@EnableScheduling
public class ProducerApplication {

   public static void main(String[] args) {
      SpringApplication.run(ProducerApplication.class, args);
   }

   @Bean
   public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
      final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
      rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
      return rabbitTemplate;
   }

   @Bean
   public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
      return new Jackson2JsonMessageConverter();
   }

}

The producer application starts sending messages just after startup. Each message contains id, type and message fields. They are sent to the topic exchange with 5 seconds interval, and to the direct exchange with 2 seconds interval.

@Component
@Slf4j
public class ProducerComponent {

   int index = 1;
   private RabbitTemplate rabbitTemplate;

   ProducerComponent(RabbitTemplate rabbitTemplate) {
      this.rabbitTemplate = rabbitTemplate;
   }

   @Scheduled(fixedRate = 5000)
   public void sendToTopic() {
      SampleMessage msg = new SampleMessage(index++, "abc", "topic");
      rabbitTemplate.convertAndSend("trx-events-topic", null, msg);
      log.info("Sending message: {}", msg);
   }

   @Scheduled(fixedRate = 2000)
   public void sendToDirect() {
      SampleMessage msg = new SampleMessage(index++, "abc", "direct");
      rabbitTemplate.convertAndSend("trx-events-direct", null, msg);
      log.info("Sending message: {}", msg);
   }
   
}

Step 5 – Deploying Prometheus and Grafana for RabbitMQ monitoring

We will use Prometheus for collecting metrics from RabbitMQ, and our both Spring Boot applications. Prometheus detects endpoints with metrics by the Kubernetes Service app label and a HTTP port name. Of course, you can define different search criteria for that. Because Spring Boot and RabbitMQ metrics are defined under different endpoints, we need to define two jobs. On the source code below you can see the ConfigMap that contains the Prometheus configuration file.

apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus
  labels:
    name: prometheus
data:
  prometheus.yml: |-
    scrape_configs:
      - job_name: 'springboot'
        metrics_path: /actuator/prometheus
        scrape_interval: 5s
        kubernetes_sd_configs:
        - role: endpoints
          namespaces:
            names:
              - default

        relabel_configs:
          - source_labels: [__meta_kubernetes_service_label_app]
            separator: ;
            regex: (producer|listener)
            replacement: $1
            action: keep
          - source_labels: [__meta_kubernetes_endpoint_port_name]
            separator: ;
            regex: http
            replacement: $1
            action: keep
          # ...
      - job_name: 'rabbitmq'
        metrics_path: /metrics
        scrape_interval: 5s
        kubernetes_sd_configs:
        - role: endpoints
          namespaces:
            names:
              - default

        relabel_configs:
          - source_labels: [__meta_kubernetes_service_label_app]
            separator: ;
            regex: rabbitmq
            replacement: $1
            action: keep
          - source_labels: [__meta_kubernetes_endpoint_port_name]
            separator: ;
            regex: prometheus
            replacement: $1
            action: keep
          # ...

For the full version of Prometheus deployment please refer to the source code. Prometheus tries to discover metric endpoints by the Kubernetes Service label and port name. Let’s take a look on the Service for the listener application.

apiVersion: v1
kind: Service
metadata:
  name: listener-service
  labels:
    app: listener
spec:
  type: ClusterIP
  selector:
    app: listener
  ports:
  - port: 8080
    name: http

Similarly, we should create Service for RabbitMQ.

apiVersion: v1
kind: Service
metadata:
  name: rabbitmq-service
  labels:
    app: rabbitmq
spec:
  type: NodePort
  selector:
    app: rabbitmq
  ports:
  - port: 15672
    name: http
  - port: 5672
    name: amqp
  - port: 15692
    name: prometheus

Step 6 – Deploying stack for RabbitMQ monitoring

We can finally proceed to the deployment on Kubernetes. In summary, we have five running applications. Three of them, RabbitMQ with the management console, Prometheus, and Grafana are a part of the RabbitMQ monitoring stack. We also have a single instance of the Spring Boot AMQP producer application, and two instances of the Spring Boot AMQP listener application. You can see the final list of pods in the picture below.

rabbitmq-monitoring-pods

If you deploy Spring Boot applications with skaffold dev --port-forward command, you can easily access them on the local port. Other applications can be accessed via Kubernetes Service NodePort.

Step 7 – RabbitMQ monitoring with Prometheus metrics

We can easily verify a list of metrics generated by Spring Boot applications by calling /actuator/prometheus endpoint. First, let’s take a look at the metrics returned by the listener application.

rabbitmq_not_acknowledged_published_total{name="rabbit",} 0.0
rabbitmq_unrouted_published_total{name="rabbit",} 0.0
rabbitmq_channels{name="rabbit",} 2.0
rabbitmq_consumed_total{name="rabbit",} 2432.0
rabbitmq_connections{name="rabbit",} 1.0
rabbitmq_acknowledged_total{name="rabbit",} 2432.0
spring_rabbitmq_listener_seconds_max{exception="none",listener_id="org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1",queue="d-ea28bd07-929d-4928-8d2c-5dceeec9950a",result="success",} 0.0025406
spring_rabbitmq_listener_seconds_max{exception="none",listener_id="org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0",queue="t-e8990fe4-7d8c-4a2c-96d7-fff4fe503265",result="success",} 0.0024175
spring_rabbitmq_listener_seconds_count{exception="none",listener_id="org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1",queue="d-ea28bd07-929d-4928-8d2c-5dceeec9950a",result="success",} 1712.0
spring_rabbitmq_listener_seconds_sum{exception="none",listener_id="org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1",queue="d-ea28bd07-929d-4928-8d2c-5dceeec9950a",result="success",} 0.992886413
spring_rabbitmq_listener_seconds_count{exception="none",listener_id="org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0",queue="t-e8990fe4-7d8c-4a2c-96d7-fff4fe503265",result="success",} 720.0
spring_rabbitmq_listener_seconds_sum{exception="none",listener_id="org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0",queue="t-e8990fe4-7d8c-4a2c-96d7-fff4fe503265",result="success",} 0.598468801
rabbitmq_acknowledged_published_total{name="rabbit",} 0.0
rabbitmq_failed_to_publish_total{name="rabbit",} 0.0
rabbitmq_rejected_total{name="rabbit",} 0.0
rabbitmq_published_total{name="rabbit",} 0.0

Similarly, we can verify the list of metrics from the producer application. In contrast to the listener application, it is generating rabbitmq_published_total instead of rabbitmq_consumed_total.

rabbitmq_acknowledged_published_total{name="rabbit",} 0.0
rabbitmq_unrouted_published_total{name="rabbit",} 0.0
rabbitmq_acknowledged_total{name="rabbit",} 0.0
rabbitmq_rejected_total{name="rabbit",} 0.0
rabbitmq_connections{name="rabbit",} 1.0
rabbitmq_not_acknowledged_published_total{name="rabbit",} 0.0
rabbitmq_consumed_total{name="rabbit",} 0.0
rabbitmq_failed_to_publish_total{name="rabbit",} 0.0
rabbitmq_published_total{name="rabbit",} 2553.0
rabbitmq_channels{name="rabbit",} 1.0

The list of metrics generated by the RabbitMQ Prometheus plugin is pretty impressive. I decided to use only some of them.

rabbitmq_channel_consumers 6
rabbitmq_channel_messages_published_total 2926
rabbitmq_channel_messages_delivered_total 2022
rabbitmq_channel_messages_acked_total 5922
rabbitmq_connections_opened_total 9
rabbitmq_connection_incoming_bytes_total 700878
rabbitmq_connection_outgoing_bytes_total 1388158
rabbitmq_connection_channels 5
rabbitmq_queue_messages 5917
rabbitmq_queue_consumers 6
rabbitmq_queues 10

We can visualize all the metrics on the Grafana dashboard. Grafana is using Prometheus as a data source. To repeat, Prometheus collects data from the Spring Boot applications endpoints and RabbitMQ.

rabbitmq-monitoring-grafana

Step 8 – RabbitMQ monitoring with Tracing plugin

The tracing plugin allows us to log all incoming and outgoing messages. We can configure it in the RabbitMQ management console. In order to do that we need to switch to the “Admin” tab. Then, we can enable the logging of all messages or just those incoming to the particular queue or exchange.

rabbitmq-monitoring-tracing

The logs are available inside a file. You can download it. Each entry in the file contains detailed information about a message. You can verify the name of the node, change, or queue. Of course, it also contains the message payload, properties, and time of the event as shown below.

Conclusion

RabbitMQ monitoring tools allow you to verify general metrics of the node and detailed logs of every message. In addition, Spring Boot AMQP offers dedicated metrics for applications that interact with RabbitMQ. In this article, I described how to run the full monitoring stack on Kubernetes. Enjoy 🙂

The post RabbitMQ Monitoring on Kubernetes appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2020/09/29/rabbitmq-monitoring-on-kubernetes/feed/ 6 8883
Kubernetes Messaging with Java and KubeMQ https://piotrminkowski.com/2020/01/17/kubernetes-messaging-with-java-and-kubemq/ https://piotrminkowski.com/2020/01/17/kubernetes-messaging-with-java-and-kubemq/#comments Fri, 17 Jan 2020 08:53:31 +0000 http://piotrminkowski.com/?p=7632 Have you ever tried to run any message broker on Kubernetes? KubeMQ is a relatively new solution and is not as popular as competitive tools like RabbitMQ, Kafka, or ActiveMQ. However, it has one big advantage over them – it is Kubernetes native message broker, which may be deployed there using a single command without […]

The post Kubernetes Messaging with Java and KubeMQ appeared first on Piotr's TechBlog.

]]>
Have you ever tried to run any message broker on Kubernetes? KubeMQ is a relatively new solution and is not as popular as competitive tools like RabbitMQ, Kafka, or ActiveMQ. However, it has one big advantage over them – it is Kubernetes native message broker, which may be deployed there using a single command without preparing any additional templates or manifests. This convinced me to take a closer look at KubeMQ.
KubeMQ is an enterprise-grade, scalable, highly available, and secure message broker and message queue, designed as Kubernetes native solution in a lightweight container. It is written in Go. Therefore it is being advertised as a very fast solution running inside a small Docker container, which has about 30MB. It may be easily integrated with some popular third-party tools for observability like Zipkin, Prometheus, or Datadog.
When I’m reading comparison with competitive tools like RabbitMQ or Redis available on KubeMQ site (https://kubemq.io/product-overview/) it looks pretty amazing (for KubeMQ of course). It seems the authors wanted to merge some useful features of RabbitMQ and Kafka in a single product. In fact, KubeMQ provides many interesting mechanisms like delayed delivery, message peeking, message batch sending and receiving for queues, and consumer groups, load balancing and offsetting support for pub/sub.
Ok, when I’m looking at their SDK Java I see that it’s a new product, and there are still some things to do. However, all the features listed above seem to be very useful. Of course, I won’t be able to demonstrate all of them in this article, but I’m going to show you a simple Java application that uses message queue with transactions, and a pub/sub event store. Let’s begin our KubeMQ Java tutorial.

Example

The example application is written in Java 11, and uses Spring Boot. The source code is available as usual on GitHub. The repository address is https://github.com/piomin/sample-java-kubemq.git.

Before start

Before starting with KubeMQ you need to have a running instance of Minikube. I have tested it on version 1.6.1.

$ minikube start --vm-driver=virtualbox

Running KubeMQ on Kubernetes

First, you need to install KubeMQ. For Windows, you just need to download the latest version of CLI available on address https://github.com/kubemq-io/kubemqctl/releases/download/latest/kubemqctl.exe and copy it to the directory under PATH. Before installing KubeMQ on your Minikube instance we need to register on the web site https://account.kubemq.io/login/register. You will receive a token required for the installation. Installation is very easy with CLI. You just need to execute command kubemqctl cluster create with the registration token as shown below.

kubemq-java-tutorial-create

By default, KubeMQ creates a cluster consisting of three instances (pods). It is deployed as Kubernetes StatefulSet. The deployment is available inside the newly created namespace – kubemq. We can easily check the list of running pods with kubectl get pod command.

kubemq-java-tutorial-pods

The list of pods is not very important for us. We can easily scale up and scale down the number of instances in the cluster using command kubemqctl cluster scale. KubeMQ is exposed in the cluster under different interfaces. KubeMQ Java SDK is using GRPC protocol for communication, so we use service kubemq-cluster-grpc available under port 50000.

kubernetes-messaging-java-kubemq-svc

Since KubeMQ is a native Kubernetes message broker starting with it on Minikube is very simple. After executing a single command, we may now focus on development.

Example Architecture

We have an example application deployed on Kubernetes, which integrates with KubeMQ queue and event store. The diagram visible below illustrates an architecture of the application. It exposes REST endpoint POST /orders for creating new orders. Each order signifies a transfer between two in-memory accounts. The incoming order is sent to the queue orders (1). Then it is received by the listener (2), which is responsible for updating account balances using AccountRepository bean (3). If the transaction is finished, the event is sent to the pub/sub topic transactions. Incoming events may be listened to by many subscribers (4). In the example application we have two listeners: TransactionAmountListener and TransactionCountListener (5). They are responsible for adding extra money to the target order’s account based on the different criteria. The first criteria is an amount of a given transaction, while the second is the number of processed transactions per account.

kubernetes-messaging-java-kubemq-arch

On the described example application I’m going to show you the following features of KubeMQ and its SDK for Java:

  • Sending messages to a queue
  • Listening for incoming queue messages and handling transactions
  • Sending messages to pub/sub via Channel
  • Subscribing to pub/sub events and reading older events from a store
  • Using Spring Boot for integration with KubeMQ for standalone Java application

Let’s proceed to the implementation.

Implementation with Spring Boot and KubeMQ SDK

We are beginning with configuration. The URL to KubeMQ GRPC has been externalized in the application.yml.

spring:
  application:
    name: sampleapp-kubemq
kubemq:
  address: kubemq-cluster-grpc:50000

In the @Configuration class we are defining all required KubeMQ resources as Spring beans. Each of them requires a KubeMQ cluster address. We need to declare a queue, a channel for sending events and a subscriber for subscribing to the pub/sub events and events store.

@Configuration
@ConfigurationProperties("kubemq")
public class KubeMQConfiguration {

    private String address;

    @Bean
    public Queue queue() throws ServerAddressNotSuppliedException, SSLException {
        return new Queue("transactions", "orders", address);
    }

    @Bean
    public Subscriber subscriber() {
        return new Subscriber(address);
    }

    @Bean
    public Channel channel() {
        return new Channel("transactions", "orders", true, address);
    }

    String getAddress() {
        return address;
    }

    void setAddress(String address) {
        this.address = address;
    }

}

The first component in our architecture is a controller. It exposes HTTP endpoint for placing an order. OrderController injects Queue bean and uses it for sending messages to the KubeMQ queue. After receiving a response that message has been delivered it returns an order with id and status=ACCEPTED.

@RestController
@RequestMapping("/orders")
public class OrderController {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);

    private Queue queue;

    public OrderController(Queue queue) {
        this.queue = queue;
    }

    @PostMapping
    public Order sendOrder(@RequestBody Order order) {
        try {
            LOGGER.info("Sending: {}", order);
            final SendMessageResult result = queue.SendQueueMessage(new Message()
                    .setBody(Converter.ToByteArray(order)));
            order.setId(result.getMessageID());
            order.setStatus(OrderStatus.ACCEPTED);
            LOGGER.info("Sent: {}", order);
        } catch (ServerAddressNotSuppliedException | IOException e) {
            LOGGER.error("Error sending", e);
            order.setStatus(OrderStatus.ERROR);
        }
        return order;
    }

}

The message is processed asynchronously. Since the current KubeMQ Java SDK does not provide any message listener for asynchronous processing, we use synchronous methods inside the infinitive loop. The loop is started inside a new thread handled using Spring TaskExecutor. When a new message is received, we are starting a KubeMQ transaction. It may be acknowledged or rejected. A transaction is confirmed if the source account has sufficient funds to perform a transfer to a target account. If a transaction is confirmed it sends an event to KubeMQ transactions pub/sub with information about it using Channel bean.

@Component
public class OrderListener {

	private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

	private Queue queue;
	private Channel channel;
	private OrderProcessor orderProcessor;
	private TaskExecutor taskExecutor;

	public OrderListener(Queue queue, Channel channel, OrderProcessor orderProcessor, TaskExecutor taskExecutor) {
		this.queue = queue;
		this.channel = channel;
		this.orderProcessor = orderProcessor;
		this.taskExecutor = taskExecutor;
	}

	@PostConstruct
	public void listen() {
		taskExecutor.execute(() -> {
			while (true) {
			    try {
                    Transaction transaction = queue.CreateTransaction();
                    TransactionMessagesResponse response = transaction.Receive(10, 10);
                    if (response.getMessage().getBody().length > 0) {
                        Order order = orderProcessor
                                .process((Order) Converter.FromByteArray(response.getMessage().getBody()));
                        LOGGER.info("Processed: {}", order);
                        if (order.getStatus().equals(OrderStatus.CONFIRMED)) {
                            transaction.AckMessage();
                            Event event = new Event();
                            event.setEventId(response.getMessage().getMessageID());
                            event.setBody(Converter.ToByteArray(order));
							LOGGER.info("Sending event: id={}", event.getEventId());
                            channel.SendEvent(event);
                        } else {
                            transaction.RejectMessage();
                        }
                    } else {
                        LOGGER.info("No messages");
                    }
                    Thread.sleep(10000);
                } catch (Exception e) {
					LOGGER.error("Error", e);
                }
			}
		});

	}

}

OrderListener class is using AccountRepository bean for account balance management. It is a simple in-memory store just for a demo purpose.

@Repository
public class AccountRepository {

    private List<Account> accounts = new ArrayList<>();

    public Account updateBalance(Integer id, int amount) throws InsufficientFundsException {
        Optional<Account> accOptional = accounts.stream().filter(a -> a.getId().equals(id)).findFirst();
        if (accOptional.isPresent()) {
            Account account = accOptional.get();
            account.setBalance(account.getBalance() + amount);
            if (account.getBalance() < 0)
                throw new InsufficientFundsException();
            int index = accounts.indexOf(account);
            accounts.set(index, account);
            return account;
        }
        return null;
    }

    public Account add(Account account) {
        account.setId(accounts.size() + 1);
        accounts.add(account);
        return account;
    }

    public List<Account> getAccounts() {
        return accounts;
    }

    @PostConstruct
    public void init() {
        add(new Account(null, "123456", 2000));
        add(new Account(null, "123457", 2000));
        add(new Account(null, "123458", 2000));
    }
}

And the last components in our architecture – event listeners. Both of them are subscribing to the same EventsStore transactions. The TransactionAmountListener is the simpler one. It is processing only a single event in order transfer percentage bonus counter from transaction amount to a target account. That’s why we have defined it as a listener just for new events (EventsStoreType.StartNewOnly).

@Component
public class TransactionAmountListener implements StreamObserver<EventReceive> {

    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionAmountListener.class);

    private Subscriber subscriber;
    private AccountRepository accountRepository;

    public TransactionAmountListener(Subscriber subscriber, AccountRepository accountRepository) {
        this.subscriber = subscriber;
        this.accountRepository = accountRepository;
    }

    @Override
    public void onNext(EventReceive eventReceive) {
        try {
            Order order = (Order) Converter.FromByteArray(eventReceive.getBody());
            LOGGER.info("Amount event: {}", order);
            accountRepository.updateBalance(order.getAccountIdTo(), (int) (order.getAmount() * 0.1));
        } catch (IOException | ClassNotFoundException | InsufficientFundsException e) {
            LOGGER.error("Error", e);
        }
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {

    }

    @PostConstruct
    public void init() {
        SubscribeRequest subscribeRequest = new SubscribeRequest();
        subscribeRequest.setChannel("transactions");
        subscribeRequest.setClientID("amount-listener");
        subscribeRequest.setSubscribeType(SubscribeType.EventsStore);
        subscribeRequest.setEventsStoreType(EventsStoreType.StartNewOnly);
        try {
            subscriber.SubscribeToEvents(subscribeRequest, this);
        } catch (ServerAddressNotSuppliedException | SSLException e) {
            e.printStackTrace();
        }
    }
}

The other situation is with TransactionCountListener. It should be able to retrieve a list of all events published on pub/sub after every startup of our application. That’s why we are defining StartFromFirst as EventStoreType for Subscriber. Also a clientId needs to be dynamically generated on apply startup in order to retrieve all stored events. The listener send bonus to a target account after the fifth transaction addressed to that account succesfully processed by the application.

@Component
public class TransactionCountListener implements StreamObserver<EventReceive> {

    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionCountListener.class);
    private Map<Integer, Integer> transactionsCount = new HashMap<>();

    private Subscriber subscriber;
    private AccountRepository accountRepository;

    public TransactionCountListener(Subscriber subscriber, AccountRepository accountRepository) {
        this.subscriber = subscriber;
        this.accountRepository = accountRepository;
    }

    @Override
    public void onNext(EventReceive eventReceive) {
        try {
            Order order = (Order) Converter.FromByteArray(eventReceive.getBody());
            LOGGER.info("Count event: {}", order);
            Integer accountIdTo = order.getAccountIdTo();
            Integer noOfTransactions = transactionsCount.get(accountIdTo);
            if (noOfTransactions == null)
                transactionsCount.put(accountIdTo, 1);
            else {
                transactionsCount.put(accountIdTo, ++noOfTransactions);
                if (noOfTransactions > 5) {
                    accountRepository.updateBalance(order.getAccountIdTo(), (int) (order.getAmount() * 0.1));
                    LOGGER.info("Adding extra to: id={}", order.getAccountIdTo());
                }
            }
        } catch (IOException | ClassNotFoundException | InsufficientFundsException e) {
            LOGGER.error("Error", e);
        }
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {

    }

    @PostConstruct
    public void init() {
        final SubscribeRequest subscribeRequest = new SubscribeRequest();
        subscribeRequest.setChannel("transactions");
        subscribeRequest.setClientID("count-listener-" + System.currentTimeMillis());
        subscribeRequest.setSubscribeType(SubscribeType.EventsStore);
        subscribeRequest.setEventsStoreType(EventsStoreType.StartFromFirst);
        try {
            subscriber.SubscribeToEvents(subscribeRequest, this);
        } catch (ServerAddressNotSuppliedException | SSLException e) {
            e.printStackTrace();
        }
    }

}

Running on Minikube

The easiest way to run our sample application on Minikube is with Skaffold and Jib. We don’t have to prepare any Dockerfile, only a single deployment manifest in k8s directory. Here’s our deployment.yaml file.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: sampleapp-kubemq
  namespace: kubemq
  labels:
    app: sampleapp-kubemq
spec:
  replicas: 1
  selector:
    matchLabels:
      app: sampleapp-kubemq
  template:
    metadata:
      labels:
        app: sampleapp-kubemq
    spec:
      containers:
        - name: sampleapp-kubemq
          image: piomin/sampleapp-kubemq
          ports:
            - containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
  name: sampleapp-kubemq
  namespace: kubemq
  labels:
    app: sampleapp-kubemq
spec:
  ports:
    - port: 8080
      protocol: TCP
  selector:
    app: sampleapp-kubemq
  type: NodePort

The source code is prepared to use Skaffold and Jib. It contains the skaffold.yaml file in the project root directory.

apiVersion: skaffold/v2alpha1
kind: Config
build:
  artifacts:
    - image: piomin/sampleapp-kubemq
      jib: {}
  tagPolicy:
    gitCommit: {}

We also need to have a jib-maven-plugin Maven plugin in our pom.xml.

<plugin>
	<groupId>com.google.cloud.tools</groupId>
	<artifactId>jib-maven-plugin</artifactId>
	<version>1.8.0</version>
</plugin>

Now, we only have to execute the following command.


$ skaffold dev

Since our application is deployed on Minikube, we may perform some test calls. Assuming that Minikube node is available under address 192.168.99.100, here’s the example of test request and response from application.

$ curl -s http://192.168.99.100:30833/orders -d '{"type":"TRANSFER","accountIdFrom":1,"accountIdTo":2,"amount":300,"status":"NEW"}' -H 'Content-Type: application/json'
{"type":"TRANSFER","accountIdFrom":1,"accountIdTo":2,"date":null,"amount":300,"id":"10","status":"ACCEPTED"}

We may check a list of queues created on KubeMQ using command kubemqctl queues list as shown below.

kubemq-java-tutorial-queues

After sending some other test requests and performing some restarts of the application pod we may take a look at the event_store list using command kubemqctl events_store list as shown below. We may see that there are multiple clients with id count-listener* registered, but only the current is active.

kubemq-java-tutorial-events

Let’s take a look on application logs. They are automatically displayed on the screen after running the skaffold dev command. As you see each message sent to the queue is received by the listener, which performs transfer between accounts and then sends events to pub/sub. Finally both event_store listeners receive the event.

logs-1

If you restart the pod with the application TransactionCountListener receives all events available inside event_store and counts them for each target account id. If a total number of transactions for a single account extends 5 it sends extra funds to that account.

logs-2

If a transaction is rejected by OrderListener due to lack of funds on source account the message is re-delivered to the queue.

logs-3

Conclusion

In this article I show you a sample application that integrates with KubeMQ to realize standard use cases based on queues and topics (pub/sub). Starting with KubeMQ on Kubernetes and management is extremely easy with KubeMQ CLI. It has many interesting features described in quite well prepared documentation available on site https://docs.kubemq.io/. As a modern, cloud-native message broker KubeMQ is able to transfer billions of messages daily. However, we should bear in mind, it is a relatively new product, and features are not completely refined as in competition. For example, you can compare KubeMQ dashboard (available after executing command kubemqctl cluster dashboard) with RabbitMQ Web Admin. Of course, everything takes a little time, and I will follow the progress in KubeMQ development.

The post Kubernetes Messaging with Java and KubeMQ appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2020/01/17/kubernetes-messaging-with-java-and-kubemq/feed/ 2 7632
Redis in Microservices Architecture https://piotrminkowski.com/2019/03/18/redis-in-microservices-architecture/ https://piotrminkowski.com/2019/03/18/redis-in-microservices-architecture/#respond Mon, 18 Mar 2019 09:12:30 +0000 https://piotrminkowski.wordpress.com/?p=7053 Redis can be widely used in microservices architecture. It is probably one of the few popular software solutions that may be leveraged by your application in such many different ways. Depending on the requirements it can act as a primary database, cache, message broker. While it is also a key/value store we can use it […]

The post Redis in Microservices Architecture appeared first on Piotr's TechBlog.

]]>
Redis can be widely used in microservices architecture. It is probably one of the few popular software solutions that may be leveraged by your application in such many different ways. Depending on the requirements it can act as a primary database, cache, message broker. While it is also a key/value store we can use it as a configuration server or discovery server in your microservices architecture. Although it is usually defined as an in-memory data structure, we can also run it in persistent mode.
Today, I’m going to show you some examples of using Redis with microservices built on top of Spring Boot and Spring Cloud frameworks. These applications will communicate between each other asynchronously using Redis Pub/Sub, using Redis as a cache or primary database, and finally using Redis as a configuration server. Here’s the picture that illustrates the described architecture.

redis-micro-2.png

Redis as Configuration Server

If you have already built microservices with Spring Cloud, you probably have a touch with Spring Cloud Config. It is responsible for providing distributed configuration patterns for microservices. Unfortunately Spring Cloud Config does not support Redis as a property sources backend repository. That’s why I decided to fork the Spring Cloud Config project and implement this feature. I hope my implementation will soon be included into the official Spring Cloud release, but for now you may use my forked repo to run it. It is available on my GitHub account piomin/spring-cloud-config. How to use it? Very simple. Let’s see.
The current SNAPSHOT version of Spring Boot is 2.2.0.BUILD-SNAPSHOT, the same as for Spring Cloud Config. While building Spring Cloud Config Server we need to include only those two dependencies as shown below.

<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>2.2.0.BUILD-SNAPSHOT</version>
</parent>
<artifactId>config-service</artifactId>
<groupId>pl.piomin.services</groupId>
<version>1.0-SNAPSHOT</version>

<dependencies>
   <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-config-server</artifactId>
      <version>2.2.0.BUILD-SNAPSHOT</version>
   </dependency>
</dependencies>

By default, Spring Cloud Config Server uses the Git repository backend. We need to activate the redis profile to force it using Redis as a backend. If your Redis instance listens on another address than localhost:6379 you need to overwrite auto-configured connection settings with spring.redis.* properties. Here’s our bootstrap.yml file.

spring:
  application:
    name: config-service
  profiles:
    active: redis
  redis:
    host: 192.168.99.100

The application main class should be annotated with @EnableConfigServer.

@SpringBootApplication
@EnableConfigServer
public class ConfigApplication {

   public static void main(String[] args) {
      new SpringApplicationBuilder(ConfigApplication.class).run(args);
   }

}

Before running the application we need to start the Redis instance. Here’s the command that runs it as a Docker container and exposes on port 6379.

$ docker run -d --name redis -p 6379:6379 redis

The configuration for every application has to be available under the key ${spring.application.name} or ${spring.application.name}-${spring.profiles.active[n]}.
We have to create hash with the keys corresponding to the names of configuration properties. Our sample application driver-management uses three configuration properties: server.port for setting HTTP listening port, spring.redis.host for changing default Redis address used as a message broker and database, and sample.topic.name for setting name of topic used for asynchronous communication between our microservices. Here’s the structure of Redis hash created for driver-management visualized with RDBTools.

redis-micro-3

That visualization is an equivalent of running Redis CLI command HGETALL that returns all the fields and values in a hash.

>> HGETALL driver-management
{
  "server.port": "8100",
  "sample.topic.name": "trips",
  "spring.redis.host": "192.168.99.100"
}

After setting keys and values in Redis and running Spring Cloud Config Server with active redis profile, we need to enable distributed configuration feature on the client side. To do that we just need to include spring-cloud-starter-config dependency to pom.xml of every microservice.

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-config</artifactId>
</dependency>

We use the newest stable version of Spring Cloud.

<dependencyManagement>
   <dependencies>
      <dependency>
         <groupId>org.springframework.cloud</groupId>
         <artifactId>spring-cloud-dependencies</artifactId>
         <version>Greenwich.SR1</version>
         <type>pom</type>
         <scope>import</scope>
      </dependency>
   </dependencies>
</dependencyManagement>

The name of application is taken from property spring.application.name on startup, so we need to provide the following bootstrap.yml file.

spring:
  application:
    name: driver-management

Redis as Message Broker

Now we can proceed to the second use case of Redis in microservices-based architecture – message broker. We will implement a typical asynchronous system, which is visible on the picture below. Microservice trip-management send notification to Redis Pub/Sub after creating new trip and after finishing current trip. The notification is received by both driver-management and passenger-management, which are subscribed to the particular channel.

micro-redis-1.png

Our application is very simple. We just need to add the following dependencies in order to provide REST API and integrate with Redis Pub/Sub.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

We should register bean with the channel name and publisher. TripPublisher is responsible for sending messages to the target topic.

@Configuration
public class TripConfiguration {

   @Autowired
   RedisTemplate<?, ?> redisTemplate;

   @Bean
   TripPublisher redisPublisher() {
      return new TripPublisher(redisTemplate, topic());
   }

   @Bean
   ChannelTopic topic() {
      return new ChannelTopic("trips");
   }

}

TripPublisher uses RedisTemplate for sending messages to the topic. Before sending it converts every message from object to JSON string using Jackson2JsonRedisSerializer.

public class TripPublisher {

   private static final Logger LOGGER = LoggerFactory.getLogger(TripPublisher.class);

   RedisTemplate<?, ?> redisTemplate;
   ChannelTopic topic;

   public TripPublisher(RedisTemplate<?, ?> redisTemplate, ChannelTopic topic) {
      this.redisTemplate = redisTemplate;
      this.redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer(Trip.class));
      this.topic = topic;
   }

   public void publish(Trip trip) throws JsonProcessingException {
      LOGGER.info("Sending: {}", trip);
      redisTemplate.convertAndSend(topic.getTopic(), trip);
   }

}

We have already implemented the logic on the publisher side. Now, we can proceed to the implementation on subscriber sides. We have two microservices driver-management and passenger-management that listens for the notifications sent by trip-management microservice. We need to define RedisMessageListenerContainer bean and set a message listener implementation class.

@Configuration
public class DriverConfiguration {

   @Autowired
   RedisConnectionFactory redisConnectionFactory;

   @Bean
   RedisMessageListenerContainer container() {
      RedisMessageListenerContainer container = new RedisMessageListenerContainer();
      container.addMessageListener(messageListener(), topic());
      container.setConnectionFactory(redisConnectionFactory);
      return container;
   }

   @Bean
   MessageListenerAdapter messageListener() {
      return new MessageListenerAdapter(new DriverSubscriber());
   }

   @Bean
   ChannelTopic topic() {
      return new ChannelTopic("trips");
   }

}

The class responsible for handling incoming notification needs to implement MessageListener interface. After receiving message DriverSubscriber deserializes it from JSON to object and change driver status.

@Service
public class DriverSubscriber implements MessageListener {

   private final Logger LOGGER = LoggerFactory.getLogger(DriverSubscriber.class);

   @Autowired
   DriverRepository repository;
   ObjectMapper mapper = new ObjectMapper();

   @Override
   public void onMessage(Message message, byte[] bytes) {
      try {
         Trip trip = mapper.readValue(message.getBody(), Trip.class);
         LOGGER.info("Message received: {}", trip.toString());
         Optional<Driver> optDriver = repository.findById(trip.getDriverId());
         if (optDriver.isPresent()) {
            Driver driver = optDriver.get();
            if (trip.getStatus() == TripStatus.DONE)
               driver.setStatus(DriverStatus.WAITING);
            else
               driver.setStatus(DriverStatus.BUSY);
            repository.save(driver);
         }
      } catch (IOException e) {
         LOGGER.error("Error reading message", e);
      }
   }

}

Redis as Primary Database

Although the main purpose of using Redis is in-memory caching or key/value store it may also act as a primary database for your application. In that case it is worth it to run Redis in persistent mode.

$ docker run -d --name redis -p 6379:6379 redis redis-server --appendonly yes

Entities are stored inside Redis using hash operations and mmap structure. Each entity needs to have a hash key and id.

@RedisHash("driver")
public class Driver {

   @Id
   private Long id;
   private String name;
   @GeoIndexed
   private Point location;
   private DriverStatus status;

   // setters and getters ...
}

Fortunately, Spring Data Redis provides a well-known repositories pattern for Redis integration. To enable it we should annotate configuration or main class with @EnableRedisRepositories. When using the Spring repositories pattern we don’t have to build any queries to Redis by ourselves.

@Configuration
@EnableRedisRepositories
public class DriverConfiguration {
   // logic ...
}

With Spring Data repositories we don’t have to build any Redis queries, but just name methods following Spring Data convention. For more details, you may refer to my previous article Introduction to Spring Data Redis. For our sample purposes we can use default methods implemented inside Spring Data. Here’s declaration of repository interface in driver-management.

public interface DriverRepository extends CrudRepository<Driver, Long> {}

Don’t forget to enable Spring Data repositories by annotating the main application class or configuration class with @EnableRedisRepositories.

@Configuration
@EnableRedisRepositories
public class DriverConfiguration {
   ...
}

Conclusion

As I have mentioned in the preface there are various use cases for Redis in microservices architecture. I have just presented how you can easily use it together with Spring Cloud and Spring Data to provide configuration server, message broker and database. Redis is commonly considered as a cache, but I hope that after reading this article you will change your mind about it. The sample applications source code is as usual available on GitHub: https://github.com/piomin/sample-redis-microservices.git.

The post Redis in Microservices Architecture appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2019/03/18/redis-in-microservices-architecture/feed/ 0 7053
RabbitMQ Cluster with Consul and Vault https://piotrminkowski.com/2018/12/27/rabbitmq-cluster-with-consul-and-vault/ https://piotrminkowski.com/2018/12/27/rabbitmq-cluster-with-consul-and-vault/#comments Thu, 27 Dec 2018 23:20:07 +0000 https://piotrminkowski.wordpress.com/?p=6930 Almost two years ago I wrote an article about RabbitMQ clustering RabbitMQ in cluster. It was one of the first posts on my blog, and it’s really hard to believe it has been two years since I started this blog. Anyway, one of the questions about the topic described in the mentioned article inspired me […]

The post RabbitMQ Cluster with Consul and Vault appeared first on Piotr's TechBlog.

]]>
Almost two years ago I wrote an article about RabbitMQ clustering RabbitMQ in cluster. It was one of the first posts on my blog, and it’s really hard to believe it has been two years since I started this blog. Anyway, one of the questions about the topic described in the mentioned article inspired me to return to that subject one more time. That question pointed to the problem of an approach to setting up the cluster. This approach assumes that we are manually attaching new nodes to the cluster by executing the command rabbitmqctl join_cluster with cluster name as a parameter. If I remember correctly it was the only one available method of creating a cluster at that time. Today we have more choices, which illustrates an evolution of RabbitMQ during the last two years. RabbitMQ cluster can be formed in a number of ways:

  • Manually with rabbitmqctl (as described in my article RabbitMQ in cluster)
  • Declaratively by listing cluster nodes in config file
  • Using DNS-based discovery
  • Using AWS (EC2) instance discovery via a dedicated plugin
  • Using Kubernetes discovery via a dedicated plugin
  • Using Consul discovery via a dedicated plugin
  • Using etcd-based discovery via a dedicated plugin

Today, I’m going to show you how to create RabbitMQ cluster using service discovery based on HashiCorp’s Consul. Additionally, we will include Vault to our architecture in order to use its interesting feature called secrets engine for managing credentials used for accessing RabbitMQ. We will set up this sample on the local machine using Docker images of RabbitMQ, Consul and Vault. Finally, we will test our solution using a simple Spring Boot application that sends and listens for incoming messages to the cluster. That application is available on GitHub repository sample-haclustered-rabbitmq-service in the branch consul.

Architecture

We use Vault as a credentials manager when applications try to authenticate against RabbitMQ nodes or when a user tries to login to RabbitMQ web admin console. Each RabbitMQ node registers itself after startup in Consul and retrieves a list of nodes running inside a cluster. Vault is integrated with RabbitMQ using a dedicated secrets engine. Here’s an architecture of our sample solution.

rabbit-consul-logo (1)

1. Configure RabbitMQ Consul plugin

The integration between RabbitMQ and Consul is realized via plugin rabbitmq-peer-discovery-consul. This plugin is not enabled by default on the official RabbitMQ Docker container. So, the first step is to build our own Docker image based on the official RabbitMQ image that installs and enables the required plugin. By default, RabbitMQ main configuration file is available under path /etc/rabbitmq/rabbitmq.conf inside a Docker container. To override it we just use the COPY statement as shown below. The following Dockerfile definition takes RabbitMQ with a management web console as base image and enables rabbitmq_peer_discovery_consul plugin.


FROM rabbitmq:3.7.8-management
COPY rabbitmq.conf /etc/rabbitmq
RUN rabbitmq-plugins enable --offline rabbitmq_peer_discovery_consul

Now, let’s take a closer look on our plugin configuration settings. Because I run Docker on Windows Consul is not available under default localhost address, but on 192.168.99.100. So, first we need to set that IP address using property cluster_formation.consul.host. We also need to set Consul as a default peer discovery implementation by setting the name of plugin for property cluster_formation.peer_discovery_backend. Finally, we have to set two additional properties to make it work in our local Docker environment. It is related to the address of RabbitMQ node sent to Consul during the registration process. It is important to compute it properly, and not to send for example localhost. After setting property cluster_formation.consul.svc_addr_use_nodename to false node will register itself using host name instead of node name. We can set the name of the host for the container inside its running command. Here’s my full RabbitMQ configuration file used in the demo for this article.

loopback_users.guest = false
listeners.tcp.default = 5672
hipe_compile = false
management.listener.port = 15672
management.listener.ssl = false
cluster_formation.peer_discovery_backend = rabbit_peer_discovery_consul
cluster_formation.consul.host = 192.168.99.100
cluster_formation.consul.svc_addr_auto = true
cluster_formation.consul.svc_addr_use_nodename = false

After saving the configuration visible above in the file rabbitmq.conf we can proceed to building our custom Docker image with RabbitMQ. This image is available in my Docker repository under alias piomin/rabbitmq, but you can also build it by yourself from Dockerfile by executing the following command.

$ docker build -t piomin/rabbitmq:1.0 .
Sending build context to Docker daemon  3.072kB
Step 1 : FROM rabbitmq:3.7.8-management
 ---> d69a5113ceae
Step 2 : COPY rabbitmq.conf /etc/rabbitmq
 ---> aa306ef88085
Removing intermediate container fda0e21178f9
Step 3 : RUN rabbitmq-plugins enable --offline rabbitmq_peer_discovery_consul
 ---> Running in 0892a42bffef
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_peer_discovery_common
  rabbitmq_peer_discovery_consul
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@fda0e21178f9...
The following plugins have been enabled:
  rabbitmq_peer_discovery_common
  rabbitmq_peer_discovery_consul

set 5 plugins.
Offline change; changes will take effect at broker restart.
 ---> cfe73f9d9904
Removing intermediate container 0892a42bffef
Successfully built cfe73f9d9904

2. Running RabbitMQ cluster on Docker

In the previous step we have succesfully created a Docker image of RabbitMQ configured to run in cluster mode using Consul discovery. Before running this image we need to start an instance of Consul. Here’s the command that starts the Docker container with Consul and exposes it on port 8500.

$ docker run -d --name consul -p 8500:8500 consul

We will also create a Docker network to enable communication between containers by hostname. It is required in this scenario, because each RabbitMQ container is registered using container hostname.

$ docker network create rabbitmq

Now, we can run our three clustered RabbitMQ containers. We will set a unique hostname for every single container (using -h option) and set the same Docker network everywhere. We also have to set the environment variable RABBITMQ_ERLANG_COOKIE.

$ docker run -d --name rabbit1 -h rabbit1 --network rabbitmq -p 30000:5672 -p 30010:15672 -e RABBITMQ_ERLANG_COOKIE='rabbitmq' piomin/rabbitmq:1.0
$ docker run -d --name rabbit2 -h rabbit2 --network rabbitmq -p 30001:5672 -p 30011:15672 -e RABBITMQ_ERLANG_COOKIE='rabbitmq' piomin/rabbitmq:1.0
$ docker run -d --name rabbit3 -h rabbit3 --network rabbitmq -p 30002:5672 -p 30012:15672 -e RABBITMQ_ERLANG_COOKIE='rabbitmq' piomin/rabbitmq:1.0

After running all three instances of RabbitMQ we can first take a look on the Consul web console. You should see there the new service called rabbitmq. This value is the default name of a cluster set by RabbitMQ Consul plugin. We can override inside rabbitmq.conf using cluster_formation.consul.svc property.

rabbit-consul-1

We can check out if the cluster has been succesfully started using RabbitMQ web management console. Every node is exposing it. I just had to override default port 15672 to avoid port conflicts between three running instances.

rabbit-consul-10

3. Integrating RabbitMQ with Vault

In the two previous steps we have succesfully run the cluster of three RabbitMQ nodes based on Consul discovery. Now, we will include Vault to our sample system to dynamically generate user credentials. Let’s begin from running Vault on Docker. You can find detailed information about it in my previous article Secure Spring Cloud Microservices with Vault and Nomad. We will run Vault in development mode using the following command.

$ docker run --cap-add=IPC_LOCK -d --name vault -p 8200:8200 vault

You can copy the root token from container logs using docker logs -f vault command. Then you have to login to the Vault web console available under address http://192.168.99.100:8200 using this token and enable RabbitMQ secret engine as shown below.

rabbit-consul-2

And confirm.

rabbit-consul-3

You can easily run Vault commands using a terminal provided by the web admin console or do the same thing using HTTP API. The first command visible below is used for writing connection details. We just need to pass RabbitMQ address and admin user credentials. The provided configuration settings points to #1 RabbitMQ node, but the changes are then replicated to the whole cluster.

$ vault write rabbitmq/config/connection connection_uri="http://192.168.99.100:30010" username="guest" password="guest"

The next step is to configure a role that maps a name in Vault to virtual host permissions.

$ vault write rabbitmq/roles/default vhosts='{"/":{"write": ".*", "read": ".*"}}'

We can test our newly created configuration by running command vault read rabbitmq/creds/default as shown below.

rabbit-consul-4

4. Sample application

Our sample application is pretty simple. It consists of two modules. First of them sender is responsible for sending messages to RabbitMQ, while second listener for receiving incoming messages. Both of them are Spring Boot applications that integrate with RabbitMQ and Vault using the following dependencies.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-vault-config-rabbitmq</artifactId>
   <version>2.0.2.RELEASE</version>
</dependency>

We need to provide some configuration settings in bootstrap.yml file to integrate our application with Vault. First, we need to enable plugin for that integration by setting property spring.cloud.vault.rabbitmq.enabled to true. Of course, Vault address and root token are required. It is also important to set property spring.cloud.vault.rabbitmq.role with the name of Vault role configured in step 3. Spring Cloud Vault injects username and password generated by Vault to the application properties spring.rabbitmq.username and spring.rabbitmq.password, so the only thing we need to configure in bootstrap.yml file is the list of available cluster nodes.

spring:
  rabbitmq:
    addresses: 192.168.99.100:30000,192.168.99.100:30001,192.168.99.100:30002
  cloud:
    vault:
      uri: http://192.168.99.100:8200
      token: s.7DaENeiqLmsU5ZhEybBCRJhp
      rabbitmq:
        enabled: true
        role: default
        backend: rabbitmq

For the test purposes you should enable high-available queues on RabbitMQ. For instructions how to configure them using policies you can refer to my article RabbitMQ in cluster. The application works at the level of exchanges. Auto-configured connection factory is injected into the application and set for RabbitTemplate bean.

@SpringBootApplication
public class Sender {
   
   private static final Logger LOGGER = LoggerFactory.getLogger("Sender");
   
   @Autowired
   RabbitTemplate template;

   public static void main(String[] args) {
      SpringApplication.run(Sender.class, args);
   }

   @PostConstruct
   public void send() {
      for (int i = 0; i < 1000; i++) {
         int id = new Random().nextInt(100000);
         template.convertAndSend(new Order(id, "TEST"+id, OrderType.values()[(id%2)]));
      }
      LOGGER.info("Sending completed.");
   }
    
    @Bean
    public RabbitTemplate template(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange("ex.example");
        return rabbitTemplate;
    }
    
}

Our listener app is connected only to the third node of the cluster (spring.rabbitmq.addresses=192.168.99.100:30002). However, the test queue is mirrored between all clustered nodes, so it is able to receive messages sent by sender app. You can easily test using my sample applications.

@SpringBootApplication
@EnableRabbit
public class Listener {

   private static final  Logger LOGGER = LoggerFactory.getLogger("Listener");

   private Long timestamp;

   public static void main(String[] args) {
      SpringApplication.run(Listener.class, args);
   }

   @RabbitListener(queues = "q.example")
   public void onMessage(Order order) {
      if (timestamp == null)
         timestamp = System.currentTimeMillis();
      LOGGER.info((System.currentTimeMillis() - timestamp) + " : " + order.toString());
   }

   @Bean
   public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(connectionFactory);
      factory.setConcurrentConsumers(10);
      factory.setMaxConcurrentConsumers(20);
      return factory;
   }
   
}

The post RabbitMQ Cluster with Consul and Vault appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/12/27/rabbitmq-cluster-with-consul-and-vault/feed/ 2 6930
Building and testing message-driven microservices using Spring Cloud Stream https://piotrminkowski.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/ https://piotrminkowski.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/#comments Fri, 15 Jun 2018 08:46:04 +0000 https://piotrminkowski.wordpress.com/?p=6678 In this article, you will learn how to improve automated testing of message-driven microservices with Spring Cloud Stream. Spring Boot and Spring Cloud give you a great opportunity to build microservices fast using different styles of communication. You can create synchronous REST microservices based on Spring Cloud Netflix libraries as shown in one of my […]

The post Building and testing message-driven microservices using Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to improve automated testing of message-driven microservices with Spring Cloud Stream. Spring Boot and Spring Cloud give you a great opportunity to build microservices fast using different styles of communication. You can create synchronous REST microservices based on Spring Cloud Netflix libraries as shown in one of my previous articles Quick Guide to Microservices with Spring Boot 2.0, Eureka and Spring Cloud. You can create asynchronous, reactive microservices deployed on Netty with Spring WebFlux project and combine it successfully with some Spring Cloud libraries as shown in my article Reactive Microservices with Spring WebFlux and Spring Cloud. And finally, you may implement message-driven microservices based on the publish/subscribe model using Spring Cloud Stream and a message broker like Apache Kafka or RabbitMQ. The last of the listed approaches to building microservices is the main subject of this article. I’m going to show you how to effectively build, scale, run, and test messaging microservices based on RabbitMQ broker.

Architecture

For the purpose of demonstrating Spring Cloud Stream testing features we will design a sample system that uses publish/subscribe model for inter-service communication. We have three microservices: order-service, product-service and account-service. Application order-service exposes HTTP endpoint that is responsible for processing orders sent to our system. All the incoming orders are processed asynchronously – order-service prepare and send messages to RabbitMQ exchange and then respond to the calling client that the request has been accepted for processing. Applications account-service and product-service are listening for the order messages incoming to the exchange. Microservice account-service is responsible for checking if there are sufficient funds on a customer’s account for order realization and then withdrawing cash from this account. Microservice product-service checks if there is a sufficient amount of products in the store, and changes the number of available products after processing the order. Both account-service and product-service send asynchronous response through RabbitMQ exchange (this time it is one-to-one communication using direct exchange) with a status of operation. Microservice order-service after receiving response messages sets the appropriate status of the order and exposes it through REST endpoint GET /order/{id} to the external client.

If you feel that the description of our sample system is a little incomprehensible, here’s the diagram with architecture for clarification.

stream-1

Enabling Spring Cloud Stream

The recommended way to include Spring Cloud Stream in the project is with a dependency management system. Spring Cloud Stream has an independent release train management in relation to the whole Spring Cloud framework. However, if we have declared spring-cloud-dependencies in the Elmhurst.RELEASE version inside the dependencyManagement
section, we wouldn’t have to declare anything else in pom.xml. If you prefer to use only the Spring Cloud Stream project, you should define the following section.

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-dependencies</artifactId>
      <version>Elmhurst.RELEASE</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

The next step is to add spring-cloud-stream artifact to the project dependencies. I also recommend you include at least the spring-cloud-sleuth library to provide sending messaging with the same traceId as the source request incoming to order-service.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-sleuth</artifactId>
</dependency>

Spring Cloud Stream programming model

To enable connectivity to a message broker for your application, annotate the main class with @EnableBinding. The @EnableBinding annotation takes one or more interfaces as parameters. You may choose between three interfaces provided by Spring Cloud Stream:

  • Sink: This is used for marking a service that receives messages from the inbound channel.
  • Source: This is used for sending messages to the outbound channel.
  • Processor: This can be used in case you need both an inbound channel and an outbound channel, as it extends the Source and Sink interfaces. Because order-service sends messages, as well as receives them, its main class has been annotated with @EnableBinding(Processor.class).

Here’s the main class of order-service that enables Spring Cloud Stream binding.

@SpringBootApplication
@EnableBinding(Processor.class)
public class OrderApplication {
  public static void main(String[] args) {
    new SpringApplicationBuilder(OrderApplication.class).web(true).run(args);
  }
}

Adding message broker

In Spring Cloud Stream nomenclature the implementation responsible for integration with the specific message broker is called binder. By default, Spring Cloud Stream provides binder implementations for Kafka and RabbitMQ. It is able to automatically detect and use a binder found on the classpath. Any middleware-specific settings can be overridden through external configuration properties in the form supported by Spring Boot, such as application arguments, environment variables, or just the application.yml file. To include support for RabbitMQ you should add the following dependency to the project.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

Now, our applications need to connect with one, shared instance of RabbitMQ broker. That’s why I run Docker image with RabbitMQ exposed outside on default 5672 port. It also launches a web dashboard available under address http://192.168.99.100:15672.

$ docker run -d --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

We need to override the default address of RabbitMQ for every Spring Boot application by setting property spring.rabbitmq.host to Docker machine IP 192.168.99.100.

spring:  
  rabbitmq:
    host: 192.168.99.100
    port: 5672

Implementing message-driven microservices

Spring Cloud Stream is built on top of the Spring Integration project. Spring Integration extends the Spring programming model to support the well-known Enterprise Integration Patterns (EIP). EIP defines a number of components that are typically used for orchestration in distributed systems. You have probably heard about patterns such as message channels, routers, aggregators, or endpoints. Let’s proceed to the implementation.
We begin from order-service, which is responsible for accepting orders, publishing them on shared topics, and then collecting asynchronous responses from downstream services. Here’s the @Service, which builds a message and publishes it to the remote topic using Source bean.

@Service
public class OrderSender {
  @Autowired
  private Source source;
  
  public boolean send(Order order) {
    return this.source.output().send(MessageBuilder.withPayload(order).build());
  }
}

That @Service is called by the controller, which exposes the HTTP endpoints for submitting new orders and getting order with status by id.

@RestController
public class OrderController {

   private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);
   
   private ObjectMapper mapper = new ObjectMapper();
   
   @Autowired
   OrderRepository repository;
   @Autowired
   OrderSender sender;   
   
   @PostMapping
   public Order process(@RequestBody Order order) throws JsonProcessingException {
      Order o = repository.add(order);
      LOGGER.info("Order saved: {}", mapper.writeValueAsString(order));
      boolean isSent = sender.send(o);
      LOGGER.info("Order sent: {}", mapper.writeValueAsString(Collections.singletonMap("isSent", isSent)));
      return o;
   }
   
   @GetMapping("/{id}")
   public Order findById(@PathVariable("id") Long id) {
      return repository.findById(id);
   }
   
}

Now, let’s take a closer look at the consumer side. The message sent by OrderSender bean from order-service is received by account-service and product-service. To receive the message from topic exchange, we just have to annotate the method that takes the Order object as a parameter with @StreamListener. We also have to define a target channel for the listener – in that case it is Processor.INPUT.

@SpringBootApplication
@EnableBinding(Processor.class)
public class OrderApplication {
   
   private static final Logger LOGGER = LoggerFactory.getLogger(OrderApplication.class);
   
   @Autowired
   OrderService service;
   
   public static void main(String[] args) {
      new SpringApplicationBuilder(OrderApplication.class).web(true).run(args);
   }
   
   @StreamListener(Processor.INPUT)
   public void receiveOrder(Order order) throws JsonProcessingException {
      LOGGER.info("Order received: {}", mapper.writeValueAsString(order));
      service.process(order);
   }
   
}

Received order is then processed by AccountService bean. Order may be accepted or rejected by account-service dependending on sufficient funds on customer’s account for order’s realization. The response with acceptance status is sent back to order-service via output channel invoked by the OrderSender bean.

@Service
public class AccountService {

   private static final Logger LOGGER = LoggerFactory.getLogger(AccountService.class);
   
   private ObjectMapper mapper = new ObjectMapper();
   
   @Autowired
   AccountRepository accountRepository;
   @Autowired
   OrderSender orderSender;
   
   public void process(final Order order) throws JsonProcessingException {
      LOGGER.info("Order processed: {}", mapper.writeValueAsString(order));
      List accounts =  accountRepository.findByCustomer(order.getCustomerId());
      Account account = accounts.get(0);
      LOGGER.info("Account found: {}", mapper.writeValueAsString(account));
      if (order.getPrice() <= account.getBalance()) {
         order.setStatus(OrderStatus.ACCEPTED);
         account.setBalance(account.getBalance() - order.getPrice());
      } else {
         order.setStatus(OrderStatus.REJECTED);
      }
      orderSender.send(order);
      LOGGER.info("Order response sent: {}", mapper.writeValueAsString(order));
   }
   
}

The last step is configuration. It is provided inside application.yml file. We have to properly define destinations for channels. While order-service is assigning orders-out destination to output channel, and orders-in destination to input channel, account-service and product-service do the opposite. It is logical, because messages sent by order-service via its output destination are received by consuming services via their input destinations. But it is still the same destination on a shared broker's exchange. Here are configuration settings of order-service.

spring: 
  cloud:  
    stream:
      bindings:
        output:
          destination: orders-out
        input:
          destination: orders-in
      rabbit:
        bindings:
          input:
            consumer:
              exchangeType: direct

Here's configuration provided for account-service and product-service.

spring:  
  cloud:  
    stream:
      bindings:
        output:
          destination: orders-in
        input:
          destination: orders-out
      rabbit:
        bindings:
          output:
            producer:
              exchangeType: direct
              routingKeyExpression: '"#"'

Finally, you can run our sample microservice. For now, we just need to run a single instance of each microservice. You can easily generate some test requests by running JUnit test class OrderControllerTest provided in my source code repository inside module order-service. This case is simple. In the next, we will study more advanced samples with multiple running instances of consuming services.

Scaling up

To scale up our Spring Cloud Stream applications we just need to launch additional instances of each microservice. They will still listen for the incoming messages on the same topic exchange as the currently running instances. After adding one instance of account-service and product-service we may send a test order. The result of that test won't be satisfactory for us... Why? A single order is received by all the running instances of every microservice. This is exactly how topic exchanges work - the message sent to the topic is received by all consumers, which are listening on that topic. Fortunately, Spring Cloud Stream is able to solve that problem by providing a solution called consumer group. It is responsible for guaranteeing that only one of the instances is expected to handle a given message if they are placed in a competing consumer relationship. The transformation to consumer group mechanism when running multiple instances of the service has been visualized on the following figure.

stream-2

Configuration of a consumer group mechanism is not very difficult. We just have to set a group parameter with the name of the group for the given destination. Here's the current binding configuration for account-service. The orders-in destination is a queue created for direct communication with order-service, so only orders-out is grouped using spring.cloud.stream.bindings..group property.

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: orders-in
        input:
          destination: orders-out
          group: account

Consumer group mechanisms is a concept taken from Apache Kafka, and implemented in Spring Cloud Stream also for RabbitMQ broker, which does not natively support it. So, I think it is pretty interesting how it is configured on RabbitMQ. If you run two instances of the service without setting group name on destination there are two bindings created for a single exchange (one binding per one instance) as shown in the picture below. Because two applications are listening on that exchange, there are four bindings assigned to that exchange in total.

spring-cloud-stream-testing-3

If you set a group name for the selected destination Spring Cloud Stream will create a single binding for all running instances of a given service. The name of the binding will be suffixed with the group name.

spring-cloud-stream-testing_11_06

Because, we have included spring-cloud-starter-sleuth to the project dependencies the same traceId header is sent between all the asynchronous requests exchanged during realization of single request incoming to the order-service POST endpoint. Thanks to that we can easily correlate all logs using this header using Elastic Stack (Kibana).

spring-cloud-stream-testing_11_05

Automated Testing with Spring Cloud Stream

You can easily test your microservice without connecting to a message broker. To achieve it you need to include spring-cloud-stream-test-support to your project dependencies. It contains the TestSupportBinder bean that lets you interact with the bound channels and inspect any messages sent and received by the application.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-test-support</artifactId>
  <scope>test</scope>
</dependency>

In the test class we need to declare MessageCollector bean, which is responsible for receiving messages retained by TestSupportBinder. Here's my test class from account-service. Using Processor bean I send test order to input channel. Then MessageCollector receives a message that is sent back to order-service via the output channel. Test method testAccepted creates order that should be accepted by account-service, while testRejected method sets too high an order price that results in rejecting the order.

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class OrderReceiverTest {

   private static final Logger LOGGER = LoggerFactory.getLogger(OrderReceiverTest.class);
   
   @Autowired
   private Processor processor;
   @Autowired
   private MessageCollector messageCollector;

   @Test
   @SuppressWarnings("unchecked")
   public void testAccepted() {
      Order o = new Order();
      o.setId(1L);
      o.setAccountId(1L);
      o.setCustomerId(1L);
      o.setPrice(500);
      o.setProductIds(Collections.singletonList(2L));
      processor.input().send(MessageBuilder.withPayload(o).build());
      Message received = (Message) messageCollector.forChannel(processor.output()).poll();
      LOGGER.info("Order response received: {}", received.getPayload());
      assertNotNull(received.getPayload());
      assertEquals(OrderStatus.ACCEPTED, received.getPayload().getStatus());
   }
   
   @Test
   @SuppressWarnings("unchecked")
   public void testRejected() {
      Order o = new Order();
      o.setId(1L);
      o.setAccountId(1L);
      o.setCustomerId(1L);
      o.setPrice(100000);
      o.setProductIds(Collections.singletonList(2L));
      processor.input().send(MessageBuilder.withPayload(o).build());
      Message received = (Message) messageCollector.forChannel(processor.output()).poll();
      LOGGER.info("Order response received: {}", received.getPayload());
      assertNotNull(received.getPayload());
      assertEquals(OrderStatus.REJECTED, received.getPayload().getStatus());
   }

}

Conclusion

Message-driven microservices are a good choice whenever you don't need an asynchronous response from your API. In this article, I have shown a sample use case of the publish/subscribe model in inter-service communication between your microservices. The source code is as usual available on GitHub (https://github.com/piomin/sample-message-driven-microservices.git). For other interesting examples of testing with Spring Cloud Stream library, also with Apache Kafka, you can refer to Chapter 11 in my book Mastering Spring Cloud (https://www.packtpub.com/application-development/mastering-spring-cloud).

The post Building and testing message-driven microservices using Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/feed/ 7 6678