AMQP Archives - Piotr's TechBlog https://piotrminkowski.com/tag/amqp/ Java, Spring, Kotlin, microservices, Kubernetes, containers Tue, 26 Jul 2022 09:58:37 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 AMQP Archives - Piotr's TechBlog https://piotrminkowski.com/tag/amqp/ 32 32 181738725 ActiveMQ Artemis with Spring Boot on Kubernetes https://piotrminkowski.com/2022/07/26/activemq-artemis-with-spring-boot-on-kubernetes/ https://piotrminkowski.com/2022/07/26/activemq-artemis-with-spring-boot-on-kubernetes/#comments Tue, 26 Jul 2022 09:58:34 +0000 https://piotrminkowski.com/?p=12504 This article will teach you how to run ActiveMQ on Kubernetes and integrate it with your app through Spring Boot. We will deploy a clustered ActiveMQ broker using a dedicated operator. Then we are going to build and run two Spring Boot apps. The first of them is running in multiple instances and receiving messages […]

The post ActiveMQ Artemis with Spring Boot on Kubernetes appeared first on Piotr's TechBlog.

]]>
This article will teach you how to run ActiveMQ on Kubernetes and integrate it with your app through Spring Boot. We will deploy a clustered ActiveMQ broker using a dedicated operator. Then we are going to build and run two Spring Boot apps. The first of them is running in multiple instances and receiving messages from the queue, while the second is sending messages to that queue. In order to test the ActiveMQ cluster, we will use Kind. The consumer app connects to the cluster using several different modes. We will discuss those modes in detail.

You can find a lot of articles about other message brokers like RabbitMQ or Kafka on my blog. If you would to read about RabbitMQ on Kubernetes please refer to that article. In order to find out more about Kafka and Spring Boot integration, you can read the article about Kafka Streams and Spring Cloud Stream available here. Previously I didn’t write much about ActiveMQ, but it is also a very popular message broker. For example, it supports the latest version of AMQP protocol, while Rabbit is based on their extension of AMQP 0.9.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then go to the messaging directory. You will find there three Spring Boot apps: simple-producer, simple-consumer and simple-counter. After that, you should just follow my instructions. Let’s begin.

Integrate Spring Boot with ActiveMQ

Let’s begin with integration between our Spring Boot apps and the ActiveMQ Artemis broker. In fact, ActiveMQ Artemis is the base of the commercial product provided by Red Hat called AMQ Broker. Red Hat actively develops a Spring Boot starter for ActiveMQ and an operator for running it on Kubernetes. In order to access Spring Boot, you need to include the Red Hat Maven repository in your pom.xml file:

<repository>
  <id>red-hat-ga</id>
  <url>https://maven.repository.redhat.com/ga</url>
</repository>

After that, you can include a starter in your Maven pom.xml:

<dependency>
  <groupId>org.amqphub.spring</groupId>
  <artifactId>amqp-10-jms-spring-boot-starter</artifactId>
  <version>2.5.6</version>
  <exclusions>
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>log4j-over-slf4j</artifactId>
    </exclusion>
  </exclusions>
</dependency>

Then, we just need to enable JMS for our app with the @EnableJMS annotation:

@SpringBootApplication
@EnableJms
public class SimpleConsumer {

   public static void main(String[] args) {
      SpringApplication.run(SimpleConsumer.class, args);
   }

}

Our application is very simple. It just receives and prints an incoming message. The method for receiving messages should be annotated with @JmsListener. The destination field contains the name of a target queue.

@Service
public class Listener {

   private static final Logger LOG = LoggerFactory
      .getLogger(Listener.class);

   @JmsListener(destination = "test-1")
   public void processMsg(SimpleMessage message) {
      LOG.info("============= Received: " + message);
   }

}

Here’s the class that represents our message:

public class SimpleMessage implements Serializable {

   private Long id;
   private String source;
   private String content;

   public SimpleMessage() {
   }

   public SimpleMessage(Long id, String source, String content) {
      this.id = id;
      this.source = source;
      this.content = content;
   }

   // ... GETTERS AND SETTERS

   @Override
   public String toString() {
      return "SimpleMessage{" +
              "id=" + id +
              ", source='" + source + '\'' +
              ", content='" + content + '\'' +
              '}';
   }
}

Finally, we need to set connection configuration settings. With AMQP Spring Boot starter it is very simple. We just need to set the property amqphub.amqp10jms.remoteUrl. For now, we are going to base on the environment variable set at the level of Kubernetes Deployment.

amqphub.amqp10jms.remoteUrl = ${ARTEMIS_URL}

The producer application is pretty similar. Instead of the annotation for receiving messages, we use Spring JmsTemplate for producing and sending messages to the target queue. The method for sending messages is exposed as an HTTP POST /producer/send endpoint.

@RestController
@RequestMapping("/producer")
public class ProducerController {

   private static long id = 1;
   private final JmsTemplate jmsTemplate;
   @Value("${DESTINATION}")
   private String destination;

   public ProducerController(JmsTemplate jmsTemplate) {
      this.jmsTemplate = jmsTemplate;
   }

   @PostMapping("/send")
   public SimpleMessage send(@RequestBody SimpleMessage message) {
      if (message.getId() == null) {
          message.setId(id++);
      }
      jmsTemplate.convertAndSend(destination, message);
      return message;
   }
}

Create a Kind cluster with Nginx Ingress

Our example apps are ready. Before deploying them, we need to prepare the local Kubernetes cluster. We will deploy there the ActiveMQ cluster consisting of three brokers. Therefore, our Kubernetes cluster will also consist of three nodes. Consequently, there are three instances of the consumer app running on Kubernetes. They are connecting to the ActiveMQ brokers over the AMQP protocol. There is also a single instance of the producer app that sends messages on demand. Here’s the diagram of our architecture.

activemq-spring-boot-kubernetes-arch

In order to run a multi-node Kubernetes cluster locally, we will use Kind. We will test not only communication over AMQP protocol but also expose the ActiveMQ management console over HTTP. Because ActiveMQ uses headless services for exposing a web console we have to create and configure Ingress on Kind to access it. Let’s begin.

In the first step, we are going to create a Kind cluster. It consists of a control plane and three workers. The configuration has to be prepared correctly to run the Nginx Ingress Controller. We should add the ingress-ready label to a single worker node and expose ports 80 and 443. Here’s the final version of a Kind config file:

kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
  - role: control-plane
  - role: worker
    kubeadmConfigPatches:
    - |
      kind: JoinConfiguration
      nodeRegistration:
        kubeletExtraArgs:
          node-labels: "ingress-ready=true"
    extraPortMappings:
    - containerPort: 80
      hostPort: 80
      protocol: TCP
    - containerPort: 443
      hostPort: 443
      protocol: TCP  
  - role: worker
  - role: worker

Now, let’s create a Kind cluster by executing the following command:

$ kind create cluster --config kind-config.yaml

If your cluster has been successfully created you should see similar information:

After that, let’s install the Nginx Ingress Controller. It is just a single command:

$ kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/main/deploy/static/provider/kind/deploy.yaml

Let’s verify the installation:

$ kubectl get pod -n ingress-nginx
NAME                                        READY   STATUS      RESTARTS  AGE
ingress-nginx-admission-create-wbbzh        0/1     Completed   0         1m
ingress-nginx-admission-patch-ws2mv         0/1     Completed   0         1m
ingress-nginx-controller-86b6d5756c-rkbmz   1/1     Running     0         1m

Install ActiveMQ Artemis on Kubernetes

Finally, we may proceed to the ActiveMQ Artemis installation. Firstly, let’s install the required CRDs. You may find all the YAML manifests inside the operator repository on GitHub.

$ git clone https://github.com/artemiscloud/activemq-artemis-operator.git
$ cd activemq-artemis-operator

The manifests with CRDs are located in the deploy/crds directory:

$ kubectl create -f ./deploy/crds

After that, we can install the operator:

$ kubectl create -f ./deploy/service_account.yaml
$ kubectl create -f ./deploy/role.yaml
$ kubectl create -f ./deploy/role_binding.yaml
$ kubectl create -f ./deploy/election_role.yaml
$ kubectl create -f ./deploy/election_role_binding.yaml
$ kubectl create -f ./deploy/operator_config.yaml
$ kubectl create -f ./deploy/operator.yaml

In order to create a cluster, we have to create the ActiveMQArtemis object. It contains a number of brokers being a part of the cluster (1). We should also set the accessor, to expose the AMQP port outside of every single broker pod (2). Of course, we will also expose the management console (3).

apiVersion: broker.amq.io/v1beta1
kind: ActiveMQArtemis
metadata:
  name: ex-aao
spec:
  deploymentPlan:
    size: 3 # (1)
    image: placeholder
    messageMigration: true
    resources:
      limits:
        cpu: "500m"
        memory: "1024Mi"
      requests:
        cpu: "250m"
        memory: "512Mi"
  acceptors: # (2)
    - name: amqp
      protocols: amqp
      port: 5672
      connectionsAllowed: 5
  console: # (3)
    expose: true

Once the ActiveMQArtemis is created, and the operator starts the deployment process. It creates the StatefulSet object:

$ kubectl get statefulset
NAME        READY   AGE
ex-aao-ss   3/3     1m

It starts all three pods with brokers sequentially:

$ kubectl get pod -l application=ex-aao-app
NAME          READY   STATUS    RESTARTS    AGE
ex-aao-ss-0   1/1     Running   0           5m
ex-aao-ss-1   1/1     Running   0           3m
ex-aao-ss-2   1/1     Running   0           1m

Let’s display a list of Services created by the operator. There is a single Service per broker for exposing the AMQP port (ex-aao-amqp-*) and web console (ex-aao-wsconsj-*):

activemq-spring-boot-kubernetes-services

The operator automatically creates Ingress objects per each web console Service. We will modify them by adding different hosts. Let’s say that is the one.activemq.com domain for the first broker, two.activemq.com for the second broker, etc.

$ kubectl get ing    
NAME                      CLASS    HOSTS                  ADDRESS     PORTS   AGE
ex-aao-wconsj-0-svc-ing   <none>   one.activemq.com       localhost   80      1h
ex-aao-wconsj-1-svc-ing   <none>   two.activemq.com       localhost   80      1h
ex-aao-wconsj-2-svc-ing   <none>   three.activemq.com                  localhost   80      1h

After creating ingresses we would have to add the following line in /etc/hosts.

127.0.0.1    one.activemq.com two.activemq.com three.activemq.com

Now, we access the management console, for example for the third broker under the following URL http://three.activemq.com/console.

activemq-spring-boot-kubernetes-console

Once the broker is ready, we may define a test queue. The name of that queue is test-1.

apiVersion: broker.amq.io/v1beta1
kind: ActiveMQArtemisAddress
metadata:
  name: address-1
spec:
  addressName: address-1
  queueName: test-1
  routingType: anycast

Run the Spring Boot app on Kubernetes and connect to ActiveMQ

Now, let’s deploy the consumer app. In the Deployment manifest, we have to set the ActiveMQ cluster connection URL. But wait… how to connect it? There are three brokers exposed using three separate Kubernetes Services. Fortunately, the AMQP Spring Boot starter supports it. We may set the addresses of three brokers inside the failover section. Let’s try it to see what will happen.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: simple-consumer
spec:
  replicas: 3
  selector:
    matchLabels:
      app: simple-consumer
  template:
    metadata:
      labels:
        app: simple-consumer
    spec:
      containers:
      - name: simple-consumer
        image: piomin/simple-consumer
        env:
          - name: ARTEMIS_URL
            value: failover:(amqp://ex-aao-amqp-0-svc:5672,amqp://ex-aao-amqp-1-svc:5672,amqp://ex-aao-amqp-2-svc:5672)
        resources:
          limits:
            memory: 256Mi
            cpu: 500m
          requests:
            memory: 128Mi
            cpu: 250m

The application is prepared to be deployed with Skaffold. If you run the skaffold dev command you will deploy and see the logs of all three instances of the consumer app. What’s the result? All the instances connect to the first URL from the list as shown below.

Fortunately, there is a failover parameter that helps distribute client connections more evenly across multiple remote peers. With the failover.randomize option, URIs are randomly shuffled before attempting to connect to one of them. Let’s replace the ARTEMIS_URL env in the Deployment manifest with the following line:

failover:(amqp://ex-aao-amqp-0-svc:5672,amqp://ex-aao-amqp-1-svc:5672,amqp://ex-aao-amqp-2-svc:5672)?failover.randomize=true

The distribution between broker instances looks slightly better. Of course, the result is random, so you may get different results.

The first way to distribute the connections is through the dedicated Kubernetes Service. We don’t have to leverage the services created automatically by the operator. We can create our own Service that load balances between all available pods with brokers.

kind: Service
apiVersion: v1
metadata:
  name: ex-aao-amqp-lb
spec:
  ports:
    - name: amqp
      protocol: TCP
      port: 5672
  type: ClusterIP
  selector:
    application: ex-aao-app

Now, we can resign from the failover section on the client side and fully rely on Kubernetes mechanisms.

spec:
  containers:
  - name: simple-consumer
    image: piomin/simple-consumer
    env:
      - name: ARTEMIS_URL
        value: amqp://ex-aao-amqp-lb:5672

This time we won’t see anything in the application logs, because all the instances connect to the same URL. We can verify a distribution between all the broker instances using e.g. the management web console. Here’s a list of consumers on the first instance of ActiveMQ:

Below, you will exactly the same results for the second instance. All the consumer app instances have been distributed equally between all available brokers inside the cluster.

Now, we are going to deploy the producer app. We use the same Kubernetes Service for connecting the ActiveMQ cluster.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: simple-producer
spec:
  replicas: 3
  selector:
    matchLabels:
      app: simple-producer
  template:
    metadata:
      labels:
        app: simple-producer
    spec:
      containers:
        - name: simple-producer
          image: piomin/simple-producer
          env:
            - name: ARTEMIS_URL
              value: amqp://ex-aao-amqp-lb:5672
            - name: DESTINATION
              value: test-1
          ports:
            - containerPort: 8080

Because we have to call the HTTP endpoint let’s create the Service for the producer app:

apiVersion: v1
kind: Service
metadata:
  name: simple-producer
spec:
  type: ClusterIP
  selector:
    app: simple-producer
  ports:
  - port: 8080

Let’s deploy the producer app using Skaffold with port-forwarding enabled:

$ skaffold dev --port-forward

Here’s a list of our Deployments:

In order to send a test message just execute the following command:

$ curl http://localhost:8080/producer/send \
  -d "{\"source\":\"test\",\"content\":\"Hello\"}" \
  -H "Content-Type:application/json"

Advanced configuration

If you need more advanced traffic distribution between brokers inside the cluster you can achieve it in several ways. For example, we can dynamically override configuration property on runtime. Here’s a very simple example. After starting the application we are connecting the external service over HTTP. It returns the next instance number.

@Configuration
public class AmqpConfig {

    @PostConstruct
    public void init() {
        RestTemplate t = new RestTemplateBuilder().build();
        int x = t.getForObject("http://simple-counter:8080/counter", Integer.class);
        System.setProperty("amqphub.amqp10jms.remoteUrl",
                "amqp://ex-aao-amqp-" + x + "-svc:5672");
    }

}

Here’s the implementation of the counter app. It just increments the number and divides it by the number of the broker instances. Of course, we may create a more advanced implementation, and provide e.g. connection to the instance of a broker running on the same Kubernetes node as the app pod.

@SpringBootApplication
@RestController
@RequestMapping("/counter")
public class CounterApp {

   private static int c = 0;

   public static void main(String[] args) {
      SpringApplication.run(CounterApp.class, args);
   }

   @Value("${DIVIDER:0}")
   int divider;

   @GetMapping
   public Integer count() {
      if (divider > 0)
         return c++ % divider;
      else
         return c++;
   }
}

Final Thoughts

ActiveMQ is an interesting alternative to RabbitMQ as a message broker. In this article, you learned how to run, manage and integrate ActiveMQ with Spring Boot on Kubernetes. It can be declaratively managed on Kubernetes thanks to ActiveMQ Artemis Operator. You can also easily integrate it with Spring Boot using a dedicated starter. It provides various configuration options and is actively developed by Red Hat and the community.

The post ActiveMQ Artemis with Spring Boot on Kubernetes appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/07/26/activemq-artemis-with-spring-boot-on-kubernetes/feed/ 6 12504
RabbitMQ in cluster https://piotrminkowski.com/2017/02/28/rabbitmq-in-cluster/ https://piotrminkowski.com/2017/02/28/rabbitmq-in-cluster/#comments Tue, 28 Feb 2017 23:23:53 +0000 https://piotrminkowski.wordpress.com/?p=1190 RabbitMQ grown into the most popular message broker software. It is written in Erlang and implements Advanced Message Queueing Protocol (AMQP). It is easy to use and configure even if we are talking about such mechanisms as clustering or high availability. In this post, I’m going to show you how to run some instances of […]

The post RabbitMQ in cluster appeared first on Piotr's TechBlog.

]]>
RabbitMQ grown into the most popular message broker software. It is written in Erlang and implements Advanced Message Queueing Protocol (AMQP). It is easy to use and configure even if we are talking about such mechanisms as clustering or high availability. In this post, I’m going to show you how to run some instances of RabbitMQ provided in docker containers in the cluster with highly available (HA) queues. Based on the sample Java application we’ll see how to send and receive messages from the RabbitMQ cluster and check how this message broker handles a large number of incoming messages. Sample Spring Boot application is available on GitHub. Here is a picture illustrating the architecture of our solution.

rabbitmq-cluster

We use an official Docker repository of RabbitMQ. Here are commands for running three RabbitMQ nodes. The first node is the master of the cluster – two other nodes will join him. We use container management to enable an UI administration console for each node. Every node has a default connection and UI management ports exposed. Important thing is to link rabbit2 and rabbit3 constainers to rabbit1, which is necessary while joining to cluster mastering by rabbit1.

$ docker run -d --hostname rabbit1 --name rabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcluster' -p 30000:5672 -p 30001:15672 rabbitmq:management
 
$ docker run -d --hostname rabbit2 --name rabbit2 --link rabbit1:rabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcluster' -p 30002:5672 -p 30003:15672 rabbitmq:management
 
$ docker run -d --hostname rabbit3 --name rabbit3 --link rabbit1:rabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcluster' -p 30004:5672 -p 30005:15672 rabbitmq:management
 

Ok, now there are three RabbitMQ running instances. We can go to the UI management console for all of those instances available as docker containers, for example http://192.168.99.100:30001 (rabbitmq). Each instance is available on its independent cluster like we see in the pictures below. We would like to make all instances working in the same cluster rabbit@rabbit1.

rabbit_cluster

rabbit_cluster2

Here’s set of commands run on rabbit2 instance for joining cluster rabbit@rabbit1. The same set should be run on rabbit3 node. In the beginning we have to connect to docker container and run bash command. Before running rabbitmq join_cluster command we have to stop broker.

$ docker exec -i -t rabbit2 \bash
root@rabbit2:/# rabbitmqctl stop_app
Stopping node rabbit@rabbit2 ...
root@rabbit2:/# rabbitmqctl join_cluster rabbit@rabbit1
Clustering node rabbit@rabbit2 with rabbit@rabbit1 ...
root@rabbit2:/# rabbitmqctl start_app
Starting node rabbit@rabbit2 ...
 

If everything was successful we should see cluster name rabbit@rabbit1 in upper right corner of rabbit2 management console. You should also see list of running nodes in the Nodes section. You can also check cluster status by running on every node command rabbitmqctl cluster_status, which should also display list of all cluster nodes.

rabbit_cluster3

After starting all nodes go to UI managent console on one of nodes. Now we are going to configure High Availibility for selected queue. It is not important which node you choose, because they are in one cluster. In the Queues tab create queue with name q.example. Then go to Admin tab and select Policies section and create new policy. In the picture below you can see policy I have created. I selected ha-mode=all which means that is mirrored across all nodes in the cluster and when a new node is added to the cluster, the queue will be mirrored to that node. There are also available exactly, nodes modes – more about RabbitMQ high availability you can find here. In pattern field enter your queue name and in apply to select Queues. If everything was succeeded you should see ha-all feature in queue row.

rabbit_cluster5.png

One of the greatest advantages of RabbitMQ is monitoring. You can see many statistics like memory, disk usage, I/O statistics, detailed message rates, graphs, etc. Some of them you could see below.

rabbit_cluster6

rabbit_cluster7

RabbitMQ has great support in the Spring framework. There many projects in which use RabbitMQ implementation by default, for example, Spring Cloud Stream, Spring Cloud Sleuth. I’m going to show you a sample Spring Boot application that sends messages to RabbitMQ cluster and receives them from the HA queue. The application source code is available on GitHub. Here’s the main class of application. We enable RabbitMQ listener by declaring @EnableRabbit on class and @RabbitListener on the receiving method. We also have to declare listened queue, broker connection factory, and listener container factory to allow listener concurrency. Inside CachingConnectionFactory we set all three addresses of RabbitMQ cluster instances: 192.168.99.100:30000, 192.168.99.100:30002, 192.168.99.100:30004.

@SpringBootApplication
@EnableRabbit
public class Listener {

   private static Logger logger = Logger.getLogger("Listener");

   public static void main(String[] args) {
      SpringApplication.run(Listener.class, args);
   }

   @RabbitListener(queues = "q.example")
   public void onMessage(Order order) {
      logger.info(order.toString());
   }

   @Bean
   public ConnectionFactory connectionFactory() {
      CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
      connectionFactory.setUsername("guest");
      connectionFactory.setPassword("guest");
      connectionFactory.setAddresses("192.168.99.100:30000,192.168.99.100:30002,192.168.99.100:30004");
      connectionFactory.setChannelCacheSize(10);
      return connectionFactory;
   }

   @Bean
   public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(connectionFactory());
      factory.setConcurrentConsumers(10);
      factory.setMaxConcurrentConsumers(20);
      return factory;
   }

   @Bean
   public Queue queue() {
      return new Queue("q.example");
   }

}
 

Conclusion

Clustering and high availability configuration with RabbitMQ is pretty simple. I like Rabbit MQ for support in the cluster monitoring process with UI management console. In my opinion, it is user friendly and intuitive. In the sample application, I send 100k messages into the sample queue. Using 20 concurrent consumers they were processed 65 seconds (~80/s per consumer thread) and memory usage at its peak was about 400MB on each node. Of course, our application is just receiving an object message and logging it in the console.

The post RabbitMQ in cluster appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2017/02/28/rabbitmq-in-cluster/feed/ 10 1190