messaging Archives - Piotr's TechBlog https://piotrminkowski.com/tag/messaging/ Java, Spring, Kotlin, microservices, Kubernetes, containers Thu, 22 Jul 2021 07:17:35 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 messaging Archives - Piotr's TechBlog https://piotrminkowski.com/tag/messaging/ 32 32 181738725 Spring Cloud Stream with Schema Registry and Kafka https://piotrminkowski.com/2021/07/22/spring-cloud-stream-with-schema-registry-and-kafka/ https://piotrminkowski.com/2021/07/22/spring-cloud-stream-with-schema-registry-and-kafka/#respond Thu, 22 Jul 2021 07:17:30 +0000 https://piotrminkowski.com/?p=9981 In this article, you will learn how to use Confluent Schema Registry with Spring Cloud Stream and Kafka in a microservices architecture. We will use Apache Avro to serialize and deserialize events exchanged between our applications. Spring Cloud Stream provides a handy mechanism for integration with Kafka and schema registry. Ok, but before we start, […]

The post Spring Cloud Stream with Schema Registry and Kafka appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Confluent Schema Registry with Spring Cloud Stream and Kafka in a microservices architecture. We will use Apache Avro to serialize and deserialize events exchanged between our applications. Spring Cloud Stream provides a handy mechanism for integration with Kafka and schema registry.

Ok, but before we start, let’s say some words about schema registry. What is this? And why we may use it in our event-driven architecture? Let’s imagine we change the message on the producer side, by adding or removing some fields. We sent that message to a Kafka topic, but we don’t have many subscribers is receiving such events. In a typical microservices architecture, we may have many producers and many subscribers. It is often necessary for all those microservices to agree on a contract that is based on a schema. If a schema is evolving, the existing microservices are still required to work. Here comes a schema registry server. It provides a RESTful interface for storing and retrieving schemas in different formats like JSON, Protobuf, or Avro. It also stores a versioned history of all schemas and provides schema compatibility checks.

We may choose between several available products. Spring Cloud has its own implementation of a schema registry server. Although it can be easily integrated with Spring Cloud Stream, we won’t use it. Currently, it doesn’t allow verifying compatibility between different versions. There is also an Apicurio registry. On the other hand, it is not possible to easily integrate it with Spring Cloud Stream. Therefore our choice fell on the Confluent schema registry.

Event-driven architecture with Spring Cloud and schema registry

We are going to run three applications. One of them is sending events to the Kafka topic, while two others are receiving them. The integration with Kafka is built on top of Spring Cloud Stream. The consumer Consumer-A is expecting events compatible with the v1 of schema, while the second subscriber is expecting events compatible with the v2 of schema. Before sending a message to Kafka the producer application tries to load schema definition from a remote server. If there is no result, it submits the data to the server, which replies with versioning information. The following diagram illustrates our architecture.

spring-cloud-schema-registry-arch

If a new schema is not compatible with the previous version, a schema registry rejects it. As a result, Spring Cloud Stream doesn’t allow to send a message to the Kafka topic. Otherwise, it serializes a message using Apache Avro. When a subscriber receives a message it first fetches schema from a remote registry. It gets a version of the schema from the header of a message. Finally, it deserializes it using the Avro format.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository and switch to the schema-registry branch. Then go to the event-driven directory. After that, you should just follow my instructions. Let’s begin.

Running Confluent Schema Registry on Kubernetes

It seems that the simplest way to run Confluent Schema Registry locally is on Kubernetes. Since we need to run at least Zookeeper and Kafka to be able to run schema registry we will use Helm for it. First, let’s add Confluent Helm repository.

$ helm repo add confluentinc https://packages.confluent.io/helm
$ helm repo update

Then we just need to install Confluent Platform using operator.

$ kubectl create ns confluent
$ helm upgrade --install confluent-operator confluentinc/confluent-for-kubernetes
$ kubectl apply -f https://raw.githubusercontent.com/confluentinc/confluent-kubernetes-examples/master/quickstart-deploy/confluent-platform.yaml

Finally, let’s display a list of running pods in the confluent namespace.

$ kubectl get pod -n confluent
NAME                                                  READY   STATUS    RESTARTS   AGE
kafka-confluent-cp-control-center-5ccb7479fd-hmpg6    1/1     Running   10         2d17h
kafka-confluent-cp-kafka-0                            2/2     Running   5          2d17h
kafka-confluent-cp-kafka-1                            2/2     Running   5          2d17h
kafka-confluent-cp-kafka-2                            2/2     Running   5          2d17h
kafka-confluent-cp-kafka-connect-797bd95655-kxnzm     2/2     Running   6          2d17h
kafka-confluent-cp-kafka-rest-69f49987bf-6nds7        2/2     Running   13         2d17h
kafka-confluent-cp-ksql-server-54675f9777-rbcb7       2/2     Running   9          2d17h
kafka-confluent-cp-schema-registry-7f6f6f9f8d-sh4b9   2/2     Running   11         2d17h
kafka-confluent-cp-zookeeper-0                        2/2     Running   4          2d17h
kafka-confluent-cp-zookeeper-1                        2/2     Running   4          2d17h
kafka-confluent-cp-zookeeper-2                        2/2     Running   4          2d17h

After that, we may display a list of Kubernetes services. Our application we will connect to the Kafka cluster through the kafka-confluent-cp-kafka service.

$ kubectl get svc -n confluent
NAME                                    TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)             AGE
kafka-confluent-cp-control-center       ClusterIP   10.100.47.14     <none>        9021/TCP            2d17h
kafka-confluent-cp-kafka                ClusterIP   10.102.129.194   <none>        9092/TCP,5556/TCP   2d17h
kafka-confluent-cp-kafka-connect        ClusterIP   10.103.223.169   <none>        8083/TCP,5556/TCP   2d17h
kafka-confluent-cp-kafka-headless       ClusterIP   None             <none>        9092/TCP            2d17h
kafka-confluent-cp-kafka-rest           ClusterIP   10.102.7.98      <none>        8082/TCP,5556/TCP   2d17h
kafka-confluent-cp-ksql-server          ClusterIP   10.108.116.196   <none>        8088/TCP,5556/TCP   2d17h
kafka-confluent-cp-schema-registry      ClusterIP   10.102.169.4     <none>        8081/TCP,5556/TCP   2d17h
kafka-confluent-cp-zookeeper            ClusterIP   10.99.33.73      <none>        2181/TCP,5556/TCP   2d17h
kafka-confluent-cp-zookeeper-headless   ClusterIP   None             <none>        2888/TCP,3888/TCP   2d17h

Integrate Spring Cloud Stream with Confluent Schema Registry

In order to enable integration with Confluent Schema Registry we first need to include the spring-cloud-schema-registry-client dependency to the Maven pom.xml.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-schema-registry-client</artifactId>
</dependency>

After that, we should enable RegistryClient through annotation. By default, the client uses a schema registry server provided by Spring Cloud. Therefore, we have registered the ConfluentSchemaRegistryClient bean as a default client implementation.

@SpringBootApplication
@EnableSchemaRegistryClient
class ProductionApplication {

   @Primary
   @Bean
   fun schemaRegistryClient(@Value("\${spring.cloud.schemaRegistryClient.endpoint}") endpoint: String?): SchemaRegistryClient {
      val client = ConfluentSchemaRegistryClient()
      client.setEndpoint(endpoint)
      return client
   }
}

Since we run our schema registry on Kubernetes, its address is different the default one. Let’s override it in application.properties.

spring.cloud.schemaRegistryClient.endpoint=http://kafka-confluent-cp-schema-registry:8081/

Because we are going to serialize messages using Apache Avro format, we need to change a default content type for all topics to application/*-avro. The message is sent with a contentType header by using the following scheme: application/[prefix].[subject].v[version]+avro, where prefix is configurable and subject is deduced from the payload type. The default prefix is vnd, and since the name of a message class is CallmeEvent the value of the header would be application/vnd.callmeevent.v1+avro for the v1 version of schema or application/vnd.callmeevent.v2+avro for the v2 version.

spring.cloud.stream.default.contentType=application/*+avro

Alternatively, we may set a content type just for a single destination. But more about it in the next sections.

Event class and Apache Avro serialization

We may choose between two types of approaches to the event class creation when working with Apache Avro. It is possible to generate Avro schema from a model class, or generate class from Avro schema using avro-maven-plugin. Assuming we use a second approach we first need to create Avro schema and place it in the source code as the .avsc file. Let’s say it is our Avro schema. It contains three fields id, message and eventType. The name of a generated class will be CallmeEvent and a package name will be the same as the namespace.

{
  "type":"record",
  "name":"CallmeEvent",
  "namespace":"pl.piomin.samples.eventdriven.producer.message.avro",
  "fields": [
    {
      "name":"id",
      "type":"int"
    },{
      "name":"message",
      "type":"string"
    },{
      "name":"eventType",
      "type": "string"
    }
  ]
}

After that, we need to the following plugin to the Maven pom.xml. We just need to configure the input directory with Avro schema files, and the output directory for the generated classes. Once you run a build, using for example mvn clean package command it will generate a required class.

<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>1.10.2</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>schema</goal>
      </goals>
      <configuration>
        <sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
        <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
      </configuration>
    </execution>
  </executions>
</plugin>

Just to simplify working with generated classes, let’s include the target/generated-sources/avro as a source directory.

<plugin>
  <groupId>org.codehaus.mojo</groupId>
  <artifactId>build-helper-maven-plugin</artifactId>
  <version>3.2.0</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>add-source</goal>
      </goals>
      <configuration>
        <sources>
          <source>${project.build.directory}/generated-sources/avro</source>
        </sources>
      </configuration>
    </execution>
  </executions>
</plugin>

However, the simplest approach, especially in development, is to generate Avro schema automatically from the source code. With this approach, we first need to create CallmeEvent class.

class CallmeEvent(val id: Int,
                  val message: String,
                  val eventType: String)

Then, we just need to enable dynamic Avro schema generation. Once you do it, Spring Cloud Stream automatically generates and sends schema to the schema registry before sending a message to a Kafka topic.

spring.cloud.schema.avro.dynamicSchemaGenerationEnabled=true

Integrate Spring Cloud Stream with Kafka

Spring Cloud Stream offers a broker agnostic programming model for sending and receiving messages. If you are looking for a quick introduction to that model and event-driven microservices read my article Introduction to event-driven microservices with Spring Cloud Stream. We use the same scenario as described in this article. However, we will add schema registry support and replace RabbitMQ with Kafka. In order to change the broker, we just need to replace a binder implementation as shown below.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

Here’s the main class of the producer-service application. It uses the Supplier bean to generate events continuously after startup.

@SpringBootApplication
@EnableSchemaRegistryClient
class ProductionApplication {

   var id: Int = 0

   @Value("\${callme.supplier.enabled}")
   val supplierEnabled: Boolean = false

   @Bean
   fun callmeEventSupplier(): Supplier<Message<CallmeEvent>?> = Supplier { createEvent() }

   @Primary
   @Bean
   fun schemaRegistryClient(@Value("\${spring.cloud.schemaRegistryClient.endpoint}") endpoint: String?): SchemaRegistryClient {
      val client = ConfluentSchemaRegistryClient()
      client.setEndpoint(endpoint)
      return client
   }

   private fun createEvent(): Message<CallmeEvent>? {
      return if (supplierEnabled)
         MessageBuilder.withPayload(CallmeEvent(++id, "I'm callme event!", "ping"))
                     .setHeader("to_process", true)
                     .build()
      else
         null
   }
}

Here’s a Spring Cloud Stream configuration for producer-service and Supplier bean. It configures partitioning based on the value of the id field.

spring.cloud.stream.bindings.callmeEventSupplier-out-0.contentType=application/*+avro
spring.cloud.stream.bindings.callmeEventSupplier-out-0.destination=callme-events
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionCount=2

Both consumers are receiving messages from the callme-events topic. The same as for producer-service we need to enable RegistryClient support.

@SpringBootApplication
@EnableSchemaRegistryClient
class ConsumerAApplication {

   val logger: Logger = LoggerFactory.getLogger(ConsumerAApplication::class.java)

   @Bean
   fun callmeEventConsumer(): Consumer<CallmeEvent> = Consumer { 
      logger.info("Received: {}", it) 
   }
}

We also need to configure deserialization with Avro and partitioning on the consumer side.

spring.cloud.stream.default.contentType=application/*+avro
spring.cloud.stream.bindings.callmeEventSupplier-in-0.contentType=application/*+avro
spring.cloud.stream.bindings.callmeEventConsumer-in-0.destination=callme-events
spring.cloud.stream.bindings.callmeEventConsumer-in-0.group=a
spring.cloud.stream.bindings.callmeEventConsumer-in-0.consumer.partitioned=true
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=${INSTANCE_INDEX}

Deploy applications on Kubernetes

Firstly, let’s deploy our Spring Cloud Stream applications on Kubernetes. Here’s a Deployment manifest for producer-service.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
      - name: producer
        image: piomin/producer-service
        ports:
        - containerPort: 8080

We also have similar manifests for consumer applications. We need to set the INSTANCE_INDEX environment variable, which is then responsible for partitioning configuration.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer-a
spec:
  selector:
    matchLabels:
      app: consumer-a
  template:
    metadata:
      labels:
        app: consumer-a
    spec:
      containers:
      - name: consumer-a
        image: piomin/consumer-a-service
        env:
          - name: INSTANCE_INDEX
            value: "0"
        ports:
        - containerPort: 8080

The Deployment manifest for the consumer-b application is visible below.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer-b
spec:
  selector:
    matchLabels:
      app: consumer-b
  template:
    metadata:
      labels:
        app: consumer-b
    spec:
      containers:
      - name: consumer-b
        image: piomin/consumer-b-service
        env:
          - name: INSTANCE_INDEX
            value: "1"
        ports:
        - containerPort: 8080

All those applications may be deployed on Kubernetes with Skaffold. Each application directory contains a Skaffold configuration file skaffold.yaml, so you just need to execute the following command to run them on Kubernetes.

$ skaffold run

Testing integration between Spring Cloud Stream and schema registry

In order to register the v1 version of the schema, we should run the producer-service application with the following event class.

class CallmeEvent(val id: Int,
                  val message: String)

Then, we should restart it with the new version of the CallmeEvent class as shown below.

class CallmeEvent(val id: Int,
                  val message: String,
                  val eventType: String)

Now, we can verify a list of schemas registered on the server. First, let’s enable port forwarding for the Confluent Schema Registry service.

$ kubectl port-forward svc/kafka-confluent-cp-schema-registry 8081:8081 -n confluent

Thanks to that, we may access schema registry REST API on the local port. Let’s display a list of registered subjects. As you see there is a single subject called callmeevent.

$ curl http://localhost:8081/subjects
["callmeevent"]

In the next step, we may get a list of versions registered under the callmeevent subject. As we expect, there are two versions available in the schema registry.

$ curl http://localhost:8081/subjects/callmeevent/versions
[1,2]

We can display a full schema definition by calling the following endpoint using schema id.

$ curl http://localhost:8081/schemas/ids/1
{"schema":"{\"type\":\"record\",\"name\":\"CallmeEvent\",\"namespace\":\"pl.piomin.samples.eventdriven.producer.message\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"message\",\"type\":\"string\"}]}"}

Finally, we are going to change our schema once again. Until then, a new version of a schema was compatible with the previous one. Now, we create a schema, which is incompatible with the previous version. In particular, we change the eventType field into eventTp. That change is provided on the producer side.

class CallmeEvent(val id: Int,
                  val message: String,
                  val eventTp: String)

After restarting producer-service Spring Cloud Stream tries to register a new version of the schema. Let’s just take a look at application logs. As you see, a new schema has been rejected by the Confluent Schema Registry. Here’s a fragment of producer-service logs after a schema change.

The post Spring Cloud Stream with Schema Registry and Kafka appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/07/22/spring-cloud-stream-with-schema-registry-and-kafka/feed/ 0 9981
Kubernetes Messaging with Java and KubeMQ https://piotrminkowski.com/2020/01/17/kubernetes-messaging-with-java-and-kubemq/ https://piotrminkowski.com/2020/01/17/kubernetes-messaging-with-java-and-kubemq/#comments Fri, 17 Jan 2020 08:53:31 +0000 http://piotrminkowski.com/?p=7632 Have you ever tried to run any message broker on Kubernetes? KubeMQ is a relatively new solution and is not as popular as competitive tools like RabbitMQ, Kafka, or ActiveMQ. However, it has one big advantage over them – it is Kubernetes native message broker, which may be deployed there using a single command without […]

The post Kubernetes Messaging with Java and KubeMQ appeared first on Piotr's TechBlog.

]]>
Have you ever tried to run any message broker on Kubernetes? KubeMQ is a relatively new solution and is not as popular as competitive tools like RabbitMQ, Kafka, or ActiveMQ. However, it has one big advantage over them – it is Kubernetes native message broker, which may be deployed there using a single command without preparing any additional templates or manifests. This convinced me to take a closer look at KubeMQ.
KubeMQ is an enterprise-grade, scalable, highly available, and secure message broker and message queue, designed as Kubernetes native solution in a lightweight container. It is written in Go. Therefore it is being advertised as a very fast solution running inside a small Docker container, which has about 30MB. It may be easily integrated with some popular third-party tools for observability like Zipkin, Prometheus, or Datadog.
When I’m reading comparison with competitive tools like RabbitMQ or Redis available on KubeMQ site (https://kubemq.io/product-overview/) it looks pretty amazing (for KubeMQ of course). It seems the authors wanted to merge some useful features of RabbitMQ and Kafka in a single product. In fact, KubeMQ provides many interesting mechanisms like delayed delivery, message peeking, message batch sending and receiving for queues, and consumer groups, load balancing and offsetting support for pub/sub.
Ok, when I’m looking at their SDK Java I see that it’s a new product, and there are still some things to do. However, all the features listed above seem to be very useful. Of course, I won’t be able to demonstrate all of them in this article, but I’m going to show you a simple Java application that uses message queue with transactions, and a pub/sub event store. Let’s begin our KubeMQ Java tutorial.

Example

The example application is written in Java 11, and uses Spring Boot. The source code is available as usual on GitHub. The repository address is https://github.com/piomin/sample-java-kubemq.git.

Before start

Before starting with KubeMQ you need to have a running instance of Minikube. I have tested it on version 1.6.1.

$ minikube start --vm-driver=virtualbox

Running KubeMQ on Kubernetes

First, you need to install KubeMQ. For Windows, you just need to download the latest version of CLI available on address https://github.com/kubemq-io/kubemqctl/releases/download/latest/kubemqctl.exe and copy it to the directory under PATH. Before installing KubeMQ on your Minikube instance we need to register on the web site https://account.kubemq.io/login/register. You will receive a token required for the installation. Installation is very easy with CLI. You just need to execute command kubemqctl cluster create with the registration token as shown below.

kubemq-java-tutorial-create

By default, KubeMQ creates a cluster consisting of three instances (pods). It is deployed as Kubernetes StatefulSet. The deployment is available inside the newly created namespace – kubemq. We can easily check the list of running pods with kubectl get pod command.

kubemq-java-tutorial-pods

The list of pods is not very important for us. We can easily scale up and scale down the number of instances in the cluster using command kubemqctl cluster scale. KubeMQ is exposed in the cluster under different interfaces. KubeMQ Java SDK is using GRPC protocol for communication, so we use service kubemq-cluster-grpc available under port 50000.

kubernetes-messaging-java-kubemq-svc

Since KubeMQ is a native Kubernetes message broker starting with it on Minikube is very simple. After executing a single command, we may now focus on development.

Example Architecture

We have an example application deployed on Kubernetes, which integrates with KubeMQ queue and event store. The diagram visible below illustrates an architecture of the application. It exposes REST endpoint POST /orders for creating new orders. Each order signifies a transfer between two in-memory accounts. The incoming order is sent to the queue orders (1). Then it is received by the listener (2), which is responsible for updating account balances using AccountRepository bean (3). If the transaction is finished, the event is sent to the pub/sub topic transactions. Incoming events may be listened to by many subscribers (4). In the example application we have two listeners: TransactionAmountListener and TransactionCountListener (5). They are responsible for adding extra money to the target order’s account based on the different criteria. The first criteria is an amount of a given transaction, while the second is the number of processed transactions per account.

kubernetes-messaging-java-kubemq-arch

On the described example application I’m going to show you the following features of KubeMQ and its SDK for Java:

  • Sending messages to a queue
  • Listening for incoming queue messages and handling transactions
  • Sending messages to pub/sub via Channel
  • Subscribing to pub/sub events and reading older events from a store
  • Using Spring Boot for integration with KubeMQ for standalone Java application

Let’s proceed to the implementation.

Implementation with Spring Boot and KubeMQ SDK

We are beginning with configuration. The URL to KubeMQ GRPC has been externalized in the application.yml.

spring:
  application:
    name: sampleapp-kubemq
kubemq:
  address: kubemq-cluster-grpc:50000

In the @Configuration class we are defining all required KubeMQ resources as Spring beans. Each of them requires a KubeMQ cluster address. We need to declare a queue, a channel for sending events and a subscriber for subscribing to the pub/sub events and events store.

@Configuration
@ConfigurationProperties("kubemq")
public class KubeMQConfiguration {

    private String address;

    @Bean
    public Queue queue() throws ServerAddressNotSuppliedException, SSLException {
        return new Queue("transactions", "orders", address);
    }

    @Bean
    public Subscriber subscriber() {
        return new Subscriber(address);
    }

    @Bean
    public Channel channel() {
        return new Channel("transactions", "orders", true, address);
    }

    String getAddress() {
        return address;
    }

    void setAddress(String address) {
        this.address = address;
    }

}

The first component in our architecture is a controller. It exposes HTTP endpoint for placing an order. OrderController injects Queue bean and uses it for sending messages to the KubeMQ queue. After receiving a response that message has been delivered it returns an order with id and status=ACCEPTED.

@RestController
@RequestMapping("/orders")
public class OrderController {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);

    private Queue queue;

    public OrderController(Queue queue) {
        this.queue = queue;
    }

    @PostMapping
    public Order sendOrder(@RequestBody Order order) {
        try {
            LOGGER.info("Sending: {}", order);
            final SendMessageResult result = queue.SendQueueMessage(new Message()
                    .setBody(Converter.ToByteArray(order)));
            order.setId(result.getMessageID());
            order.setStatus(OrderStatus.ACCEPTED);
            LOGGER.info("Sent: {}", order);
        } catch (ServerAddressNotSuppliedException | IOException e) {
            LOGGER.error("Error sending", e);
            order.setStatus(OrderStatus.ERROR);
        }
        return order;
    }

}

The message is processed asynchronously. Since the current KubeMQ Java SDK does not provide any message listener for asynchronous processing, we use synchronous methods inside the infinitive loop. The loop is started inside a new thread handled using Spring TaskExecutor. When a new message is received, we are starting a KubeMQ transaction. It may be acknowledged or rejected. A transaction is confirmed if the source account has sufficient funds to perform a transfer to a target account. If a transaction is confirmed it sends an event to KubeMQ transactions pub/sub with information about it using Channel bean.

@Component
public class OrderListener {

	private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

	private Queue queue;
	private Channel channel;
	private OrderProcessor orderProcessor;
	private TaskExecutor taskExecutor;

	public OrderListener(Queue queue, Channel channel, OrderProcessor orderProcessor, TaskExecutor taskExecutor) {
		this.queue = queue;
		this.channel = channel;
		this.orderProcessor = orderProcessor;
		this.taskExecutor = taskExecutor;
	}

	@PostConstruct
	public void listen() {
		taskExecutor.execute(() -> {
			while (true) {
			    try {
                    Transaction transaction = queue.CreateTransaction();
                    TransactionMessagesResponse response = transaction.Receive(10, 10);
                    if (response.getMessage().getBody().length > 0) {
                        Order order = orderProcessor
                                .process((Order) Converter.FromByteArray(response.getMessage().getBody()));
                        LOGGER.info("Processed: {}", order);
                        if (order.getStatus().equals(OrderStatus.CONFIRMED)) {
                            transaction.AckMessage();
                            Event event = new Event();
                            event.setEventId(response.getMessage().getMessageID());
                            event.setBody(Converter.ToByteArray(order));
							LOGGER.info("Sending event: id={}", event.getEventId());
                            channel.SendEvent(event);
                        } else {
                            transaction.RejectMessage();
                        }
                    } else {
                        LOGGER.info("No messages");
                    }
                    Thread.sleep(10000);
                } catch (Exception e) {
					LOGGER.error("Error", e);
                }
			}
		});

	}

}

OrderListener class is using AccountRepository bean for account balance management. It is a simple in-memory store just for a demo purpose.

@Repository
public class AccountRepository {

    private List<Account> accounts = new ArrayList<>();

    public Account updateBalance(Integer id, int amount) throws InsufficientFundsException {
        Optional<Account> accOptional = accounts.stream().filter(a -> a.getId().equals(id)).findFirst();
        if (accOptional.isPresent()) {
            Account account = accOptional.get();
            account.setBalance(account.getBalance() + amount);
            if (account.getBalance() < 0)
                throw new InsufficientFundsException();
            int index = accounts.indexOf(account);
            accounts.set(index, account);
            return account;
        }
        return null;
    }

    public Account add(Account account) {
        account.setId(accounts.size() + 1);
        accounts.add(account);
        return account;
    }

    public List<Account> getAccounts() {
        return accounts;
    }

    @PostConstruct
    public void init() {
        add(new Account(null, "123456", 2000));
        add(new Account(null, "123457", 2000));
        add(new Account(null, "123458", 2000));
    }
}

And the last components in our architecture – event listeners. Both of them are subscribing to the same EventsStore transactions. The TransactionAmountListener is the simpler one. It is processing only a single event in order transfer percentage bonus counter from transaction amount to a target account. That’s why we have defined it as a listener just for new events (EventsStoreType.StartNewOnly).

@Component
public class TransactionAmountListener implements StreamObserver<EventReceive> {

    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionAmountListener.class);

    private Subscriber subscriber;
    private AccountRepository accountRepository;

    public TransactionAmountListener(Subscriber subscriber, AccountRepository accountRepository) {
        this.subscriber = subscriber;
        this.accountRepository = accountRepository;
    }

    @Override
    public void onNext(EventReceive eventReceive) {
        try {
            Order order = (Order) Converter.FromByteArray(eventReceive.getBody());
            LOGGER.info("Amount event: {}", order);
            accountRepository.updateBalance(order.getAccountIdTo(), (int) (order.getAmount() * 0.1));
        } catch (IOException | ClassNotFoundException | InsufficientFundsException e) {
            LOGGER.error("Error", e);
        }
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {

    }

    @PostConstruct
    public void init() {
        SubscribeRequest subscribeRequest = new SubscribeRequest();
        subscribeRequest.setChannel("transactions");
        subscribeRequest.setClientID("amount-listener");
        subscribeRequest.setSubscribeType(SubscribeType.EventsStore);
        subscribeRequest.setEventsStoreType(EventsStoreType.StartNewOnly);
        try {
            subscriber.SubscribeToEvents(subscribeRequest, this);
        } catch (ServerAddressNotSuppliedException | SSLException e) {
            e.printStackTrace();
        }
    }
}

The other situation is with TransactionCountListener. It should be able to retrieve a list of all events published on pub/sub after every startup of our application. That’s why we are defining StartFromFirst as EventStoreType for Subscriber. Also a clientId needs to be dynamically generated on apply startup in order to retrieve all stored events. The listener send bonus to a target account after the fifth transaction addressed to that account succesfully processed by the application.

@Component
public class TransactionCountListener implements StreamObserver<EventReceive> {

    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionCountListener.class);
    private Map<Integer, Integer> transactionsCount = new HashMap<>();

    private Subscriber subscriber;
    private AccountRepository accountRepository;

    public TransactionCountListener(Subscriber subscriber, AccountRepository accountRepository) {
        this.subscriber = subscriber;
        this.accountRepository = accountRepository;
    }

    @Override
    public void onNext(EventReceive eventReceive) {
        try {
            Order order = (Order) Converter.FromByteArray(eventReceive.getBody());
            LOGGER.info("Count event: {}", order);
            Integer accountIdTo = order.getAccountIdTo();
            Integer noOfTransactions = transactionsCount.get(accountIdTo);
            if (noOfTransactions == null)
                transactionsCount.put(accountIdTo, 1);
            else {
                transactionsCount.put(accountIdTo, ++noOfTransactions);
                if (noOfTransactions > 5) {
                    accountRepository.updateBalance(order.getAccountIdTo(), (int) (order.getAmount() * 0.1));
                    LOGGER.info("Adding extra to: id={}", order.getAccountIdTo());
                }
            }
        } catch (IOException | ClassNotFoundException | InsufficientFundsException e) {
            LOGGER.error("Error", e);
        }
    }

    @Override
    public void onError(Throwable throwable) {

    }

    @Override
    public void onCompleted() {

    }

    @PostConstruct
    public void init() {
        final SubscribeRequest subscribeRequest = new SubscribeRequest();
        subscribeRequest.setChannel("transactions");
        subscribeRequest.setClientID("count-listener-" + System.currentTimeMillis());
        subscribeRequest.setSubscribeType(SubscribeType.EventsStore);
        subscribeRequest.setEventsStoreType(EventsStoreType.StartFromFirst);
        try {
            subscriber.SubscribeToEvents(subscribeRequest, this);
        } catch (ServerAddressNotSuppliedException | SSLException e) {
            e.printStackTrace();
        }
    }

}

Running on Minikube

The easiest way to run our sample application on Minikube is with Skaffold and Jib. We don’t have to prepare any Dockerfile, only a single deployment manifest in k8s directory. Here’s our deployment.yaml file.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: sampleapp-kubemq
  namespace: kubemq
  labels:
    app: sampleapp-kubemq
spec:
  replicas: 1
  selector:
    matchLabels:
      app: sampleapp-kubemq
  template:
    metadata:
      labels:
        app: sampleapp-kubemq
    spec:
      containers:
        - name: sampleapp-kubemq
          image: piomin/sampleapp-kubemq
          ports:
            - containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
  name: sampleapp-kubemq
  namespace: kubemq
  labels:
    app: sampleapp-kubemq
spec:
  ports:
    - port: 8080
      protocol: TCP
  selector:
    app: sampleapp-kubemq
  type: NodePort

The source code is prepared to use Skaffold and Jib. It contains the skaffold.yaml file in the project root directory.

apiVersion: skaffold/v2alpha1
kind: Config
build:
  artifacts:
    - image: piomin/sampleapp-kubemq
      jib: {}
  tagPolicy:
    gitCommit: {}

We also need to have a jib-maven-plugin Maven plugin in our pom.xml.

<plugin>
	<groupId>com.google.cloud.tools</groupId>
	<artifactId>jib-maven-plugin</artifactId>
	<version>1.8.0</version>
</plugin>

Now, we only have to execute the following command.


$ skaffold dev

Since our application is deployed on Minikube, we may perform some test calls. Assuming that Minikube node is available under address 192.168.99.100, here’s the example of test request and response from application.

$ curl -s http://192.168.99.100:30833/orders -d '{"type":"TRANSFER","accountIdFrom":1,"accountIdTo":2,"amount":300,"status":"NEW"}' -H 'Content-Type: application/json'
{"type":"TRANSFER","accountIdFrom":1,"accountIdTo":2,"date":null,"amount":300,"id":"10","status":"ACCEPTED"}

We may check a list of queues created on KubeMQ using command kubemqctl queues list as shown below.

kubemq-java-tutorial-queues

After sending some other test requests and performing some restarts of the application pod we may take a look at the event_store list using command kubemqctl events_store list as shown below. We may see that there are multiple clients with id count-listener* registered, but only the current is active.

kubemq-java-tutorial-events

Let’s take a look on application logs. They are automatically displayed on the screen after running the skaffold dev command. As you see each message sent to the queue is received by the listener, which performs transfer between accounts and then sends events to pub/sub. Finally both event_store listeners receive the event.

logs-1

If you restart the pod with the application TransactionCountListener receives all events available inside event_store and counts them for each target account id. If a total number of transactions for a single account extends 5 it sends extra funds to that account.

logs-2

If a transaction is rejected by OrderListener due to lack of funds on source account the message is re-delivered to the queue.

logs-3

Conclusion

In this article I show you a sample application that integrates with KubeMQ to realize standard use cases based on queues and topics (pub/sub). Starting with KubeMQ on Kubernetes and management is extremely easy with KubeMQ CLI. It has many interesting features described in quite well prepared documentation available on site https://docs.kubemq.io/. As a modern, cloud-native message broker KubeMQ is able to transfer billions of messages daily. However, we should bear in mind, it is a relatively new product, and features are not completely refined as in competition. For example, you can compare KubeMQ dashboard (available after executing command kubemqctl cluster dashboard) with RabbitMQ Web Admin. Of course, everything takes a little time, and I will follow the progress in KubeMQ development.

The post Kubernetes Messaging with Java and KubeMQ appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2020/01/17/kubernetes-messaging-with-java-and-kubemq/feed/ 2 7632
Kafka In Microservices With Micronaut https://piotrminkowski.com/2019/08/06/kafka-in-microservices-with-micronaut/ https://piotrminkowski.com/2019/08/06/kafka-in-microservices-with-micronaut/#respond Tue, 06 Aug 2019 07:14:19 +0000 https://piotrminkowski.wordpress.com/?p=7207 Today we are going to build an example of microservices that communicates with each other asynchronously through Apache Kafka topics. We use the Micronaut Framework, which provides a dedicated library for integration with Kafka. Let’s take a brief look at the architecture of our sample system. We have 4 microservices: order-service, trip-service, driver-service, and passenger-service. […]

The post Kafka In Microservices With Micronaut appeared first on Piotr's TechBlog.

]]>
Today we are going to build an example of microservices that communicates with each other asynchronously through Apache Kafka topics. We use the Micronaut Framework, which provides a dedicated library for integration with Kafka. Let’s take a brief look at the architecture of our sample system. We have 4 microservices: order-service, trip-service, driver-service, and passenger-service. The implementation of these applications is very simple. All of them have in-memory storage and connect to the same Kafka instance.

A primary goal of our system is to arrange a trip for customers. The order-service application also acts as a gateway. It is receiving requests from customers, saving history, and sending events to orders topic. All the other microservices are listening on this topic and processing orders sent by order-service. Each microservice has its own dedicated topic, where it sends events with information about changes. Such events are received by some other microservices. The architecture is presented in the picture below.

micronaut-kafka-1.png

Before reading this article it is worth familiarizing yourself with Micronaut Framework. You may read one of my previous articles describing a process of building microservices communicating via REST API: Quick Guide to Microservices with Micronaut Framework

1. Running Kafka

To run Apache Kafka on the local machine we may use its Docker image. It seems that the most up-to-date image is shared by https://hub.docker.com/u/wurstmeister. Before starting Kafka containers we have to start the ZooKeeper server, which is used by Kafka. If you run Docker on Windows the default address of its virtual machine is 192.168.99.100. It also has to be set as an environment for a Kafka container.
Both Zookeeper and Kafka containers will be started in the same network kafka. Zookeeper is available under the name zookeeper, and is exposed on port 2181. Kafka container requires that address under env variable KAFKA_ZOOKEEPER_CONNECT.

$ docker network create kafka
$ docker run -d --name zookeeper --network kafka -p 2181:2181 wurstmeister/zookeeper
$ docker run -d --name kafka -p 9092:9092 --network kafka --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 wurstmeister/kafka

2. Including Micronaut Kafka

Micronaut example applications built with Kafka can be started with or without the presence of an HTTP server. To enable Micronaut Kafka you need to include the micronaut-kafka library to your dependencies. In case you would like to expose HTTP API you should also include micronaut-http-server-netty:

<dependency>
   <groupId>io.micronaut.configuration</groupId>
   <artifactId>micronaut-kafka</artifactId>
</dependency>
<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-http-server-netty</artifactId>
</dependency>

3. Building microservice order-service

The application order-service as the only one starts embedded HTTP server and exposes REST API. That’s why we may enable built-in Micronaut health checks for Kafka. To do that we should first include micronaut-management dependency:

<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-management</artifactId>
</dependency>

For convenience, we will enable all management endpoints and disable HTTP authentication for them by defining the following configuration inside application.yml:

endpoints:
  all:
    enabled: true
    sensitive: false

Now, a health check is available under address http://localhost:8080/health. Our sample application will also expose a simple REST API for adding new orders and listing all previously created orders. Here’s the Micronaut controller implementation responsible for exposing those endpoints:

@Controller("orders")
public class OrderController {

    @Inject
    OrderInMemoryRepository repository;
    @Inject
    OrderClient client;

    @Post
    public Order add(@Body Order order) {
        order = repository.add(order);
        client.send(order);
        return order;
    }

    @Get
    public Set<Order> findAll() {
        return repository.findAll();
    }

}

Each microservice uses an in-memory repository implementation. Here’s repository implementation inside order-service:

@Singleton
public class OrderInMemoryRepository {

    private Set<Order> orders = new HashSet<>();

    public Order add(Order order) {
        order.setId((long) (orders.size() + 1));
        orders.add(order);
        return order;
    }

    public void update(Order order) {
        orders.remove(order);
        orders.add(order);
    }

    public Optional<Order> findByTripIdAndType(Long tripId, OrderType type) {
        return orders.stream().filter(order -> order.getTripId().equals(tripId) && order.getType() == type).findAny();
    }

    public Optional<Order> findNewestByUserIdAndType(Long userId, OrderType type) {
        return orders.stream().filter(order -> order.getUserId().equals(userId) && order.getType() == type)
                .max(Comparator.comparing(Order::getId));
    }

    public Set<Order> findAll() {
        return orders;
    }

}

In-memory repository stores Order object instances. Order object is also sent to Kafka topic named orders. Here’s an implementation of Order class:

public class Order {

    private Long id;
    private LocalDateTime createdAt;
    private OrderType type;
    private Long userId;
    private Long tripId;
    private float currentLocationX;
    private float currentLocationY;
    private OrderStatus status;
   
    // ... GETTERS AND SETTERS
}

4. Example of asynchronous communication with Kafka and Micronaut

Now, let’s consider one of the use cases possible to realize by our sample system – adding a new trip. In the first step (1) we are adding a new order of type OrderType.NEW_TRIP. After that order-service creates an order and send it to the orders topic. The order is received by three microservices: driver-service, passenger-service and order-service (2). A new order is processed by all these applications. The passenger-service application checks if there are sufficient funds on the passenger account. If not it cancels the trip, otherwise it does not do anything. The driver-service is looking for the nearest available driver, while trip-service creates and stores new trips. Both driver-service and trip-service sends events to their topics (drivers, trips) with information about changes (3) Every event can be accessed by other microservices, for example trip-service listen for event from driver-service in order to assign a new driver to the trip (4). The following picture illustrates the communication between our microservices when adding a new trip.

micronaut-kafka-3.png

Now, let’s proceed to the implementation details.

Step 1: Sending order

First we need to create a Kafka client responsible for sending messages to a topic. To achieve that we should create an interface annotated with @KafkaClient and declare one or more methods for sending messages. Every method should have a target topic name set through @Topic annotation. For method parameters we may use three annotations @KafkaKey, @Body or @Header. @KafkaKey is used for partitioning, which is required by our sample applications. In the client implementation visible below we just use @Body annotation.

@KafkaClient
public interface OrderClient {

    @Topic("orders")
    void send(@Body Order order);

}

Step 2: Receiving order

Once an order has been sent by the client it is received by all other microservices listening on the orders topic. Here’s a listener implementation in the driver-service. A listener class should be annotated with @KafkaListener. We may declare groupId as an annotation field to prevent from receiving the same message by more than one instance of a single application. Then we are declaring a method for processing incoming messages. The same as a client method it should be annotated with @Topic, to set the name of a target topic. Because we are listening for Order objects it should be annotated with @Body – the same as the corresponding client method.

@KafkaListener(groupId = "driver")
public class OrderListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

    private DriverService service;

    public OrderListener(DriverService service) {
        this.service = service;
    }

    @Topic("orders")
    public void receive(@Body Order order) {
        LOGGER.info("Received: {}", order);
        switch (order.getType()) {
            case NEW_TRIP -> service.processNewTripOrder(order);
        }
    }

}

Step 3: Sending to other Kafka topic

Now, let’s take a look on the processNewTripOrder method inside driver-service. DriverService injects two different Kafka client beans: OrderClient and DriverClient. When processing a new order it tries to find the available driver, which is the closest to the customer who sent the order. After finding him it changes the status to UNAVAILABLE and sends the message with Driver object to the drivers topic.

@Singleton
public class DriverService {

    private static final Logger LOGGER = LoggerFactory.getLogger(DriverService.class);

    private DriverClient client;
    private OrderClient orderClient;
    private DriverInMemoryRepository repository;

    public DriverService(DriverClient client, OrderClient orderClient, DriverInMemoryRepository repository) {
        this.client = client;
        this.orderClient = orderClient;
        this.repository = repository;
    }

    public void processNewTripOrder(Order order) {
        LOGGER.info("Processing: {}", order);
        Optional<Driver> driver = repository.findNearestDriver(order.getCurrentLocationX(), order.getCurrentLocationY());
        driver.ifPresent(driverLocal -> {
            driverLocal.setStatus(DriverStatus.UNAVAILABLE);
            repository.updateDriver(driverLocal);
            client.send(driverLocal, String.valueOf(order.getId()));
            LOGGER.info("Message sent: {}", driverLocal);
        });
    }
   
    // ...
}

Here’s an implementation of Kafka client inside driver-service used for sending messages to the drivers topic. Because we need to link the instance of Driver with order we annotate orderId parameter with @Header. There is no sense to include it to Driver class just to assign it to the right trip on the listener side.

@KafkaClient
public interface DriverClient {

    @Topic("drivers")
    void send(@Body Driver driver, @Header("Order-Id") String orderId);

}

Step 4: Inter-service communication example with Micronaut Kafka

The message sent by DriverClient is received by @Listener declared inside trip-service. It listens for messages incoming to the trips topic. The signature of receiving method is pretty similar to the client sending method as shown below:

@KafkaListener(groupId = "trip")
public class DriverListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

    private TripService service;

    public DriverListener(TripService service) {
        this.service = service;
    }

    @Topic("drivers")
    public void receive(@Body Driver driver, @Header("Order-Id") String orderId) {
        LOGGER.info("Received: driver->{}, header->{}", driver, orderId);
        service.processNewDriver(driver);
    }

}

A new driver with given id is being assigned to the trip searched by orderId. That’s a final step of our communication process when adding a new trip.

@Singleton
public class TripService {

    private static final Logger LOGGER = LoggerFactory.getLogger(TripService.class);

    private TripInMemoryRepository repository;
    private TripClient client;

    public TripService(TripInMemoryRepository repository, TripClient client) {
        this.repository = repository;
        this.client = client;
    }


    public void processNewDriver(Driver driver, String orderId) {
        LOGGER.info("Processing: {}", driver);
        Optional<Trip> trip = repository.findByOrderId(Long.valueOf(orderId));
        trip.ifPresent(tripLocal -> {
            tripLocal.setDriverId(driver.getId());
            repository.update(tripLocal);
        });
    }
   
   // ... OTHER METHODS

}

5. Tracing

We may easily enable distributed tracing with Micronaut Kafka. First, we need to enable and configure Micronaut Tracing. To do that you should first add some dependencies:

<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-tracing</artifactId>
</dependency>
<dependency>
    <groupId>io.zipkin.brave</groupId>
    <artifactId>brave-instrumentation-http</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-reporter</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.opentracing.brave</groupId>
    <artifactId>brave-opentracing</artifactId>
</dependency>
<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-client</artifactId>
    <version>0.0.16</version>
    <scope>runtime</scope>
</dependency>

We also need to configure some application settings inside application.yml including an address of our tracing tool. In that case, it is Zipkin.

tracing:
  zipkin:
    enabled: true
    http:
      url: http://192.168.99.100:9411
    sampler:
      probability: 1

Before starting our application we have to run Zipkin container:

$ docker run -d --name zipkin -p 9411:9411 openzipkin/zipkin

Conclusion

In this article you were guided through the process of building microservice architecture using asynchronous communication via Apache Kafka. I have shown you ea example with the most important features of the Micronaut Kafka library that allows you to easily declare producer and consumer of Kafka topics, enable health checks, and distributed tracing for your microservices. I have described an implementation of a single scenario for our system, that covers adding a new trip at the customer’s request. In order to see the full implementation of the sample system described in this article please check out the source code available on GitHub: https://github.com/piomin/sample-kafka-micronaut-microservices.git.

The post Kafka In Microservices With Micronaut appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2019/08/06/kafka-in-microservices-with-micronaut/feed/ 0 7207
Building and testing message-driven microservices using Spring Cloud Stream https://piotrminkowski.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/ https://piotrminkowski.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/#comments Fri, 15 Jun 2018 08:46:04 +0000 https://piotrminkowski.wordpress.com/?p=6678 In this article, you will learn how to improve automated testing of message-driven microservices with Spring Cloud Stream. Spring Boot and Spring Cloud give you a great opportunity to build microservices fast using different styles of communication. You can create synchronous REST microservices based on Spring Cloud Netflix libraries as shown in one of my […]

The post Building and testing message-driven microservices using Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to improve automated testing of message-driven microservices with Spring Cloud Stream. Spring Boot and Spring Cloud give you a great opportunity to build microservices fast using different styles of communication. You can create synchronous REST microservices based on Spring Cloud Netflix libraries as shown in one of my previous articles Quick Guide to Microservices with Spring Boot 2.0, Eureka and Spring Cloud. You can create asynchronous, reactive microservices deployed on Netty with Spring WebFlux project and combine it successfully with some Spring Cloud libraries as shown in my article Reactive Microservices with Spring WebFlux and Spring Cloud. And finally, you may implement message-driven microservices based on the publish/subscribe model using Spring Cloud Stream and a message broker like Apache Kafka or RabbitMQ. The last of the listed approaches to building microservices is the main subject of this article. I’m going to show you how to effectively build, scale, run, and test messaging microservices based on RabbitMQ broker.

Architecture

For the purpose of demonstrating Spring Cloud Stream testing features we will design a sample system that uses publish/subscribe model for inter-service communication. We have three microservices: order-service, product-service and account-service. Application order-service exposes HTTP endpoint that is responsible for processing orders sent to our system. All the incoming orders are processed asynchronously – order-service prepare and send messages to RabbitMQ exchange and then respond to the calling client that the request has been accepted for processing. Applications account-service and product-service are listening for the order messages incoming to the exchange. Microservice account-service is responsible for checking if there are sufficient funds on a customer’s account for order realization and then withdrawing cash from this account. Microservice product-service checks if there is a sufficient amount of products in the store, and changes the number of available products after processing the order. Both account-service and product-service send asynchronous response through RabbitMQ exchange (this time it is one-to-one communication using direct exchange) with a status of operation. Microservice order-service after receiving response messages sets the appropriate status of the order and exposes it through REST endpoint GET /order/{id} to the external client.

If you feel that the description of our sample system is a little incomprehensible, here’s the diagram with architecture for clarification.

stream-1

Enabling Spring Cloud Stream

The recommended way to include Spring Cloud Stream in the project is with a dependency management system. Spring Cloud Stream has an independent release train management in relation to the whole Spring Cloud framework. However, if we have declared spring-cloud-dependencies in the Elmhurst.RELEASE version inside the dependencyManagement
section, we wouldn’t have to declare anything else in pom.xml. If you prefer to use only the Spring Cloud Stream project, you should define the following section.

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-dependencies</artifactId>
      <version>Elmhurst.RELEASE</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

The next step is to add spring-cloud-stream artifact to the project dependencies. I also recommend you include at least the spring-cloud-sleuth library to provide sending messaging with the same traceId as the source request incoming to order-service.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-sleuth</artifactId>
</dependency>

Spring Cloud Stream programming model

To enable connectivity to a message broker for your application, annotate the main class with @EnableBinding. The @EnableBinding annotation takes one or more interfaces as parameters. You may choose between three interfaces provided by Spring Cloud Stream:

  • Sink: This is used for marking a service that receives messages from the inbound channel.
  • Source: This is used for sending messages to the outbound channel.
  • Processor: This can be used in case you need both an inbound channel and an outbound channel, as it extends the Source and Sink interfaces. Because order-service sends messages, as well as receives them, its main class has been annotated with @EnableBinding(Processor.class).

Here’s the main class of order-service that enables Spring Cloud Stream binding.

@SpringBootApplication
@EnableBinding(Processor.class)
public class OrderApplication {
  public static void main(String[] args) {
    new SpringApplicationBuilder(OrderApplication.class).web(true).run(args);
  }
}

Adding message broker

In Spring Cloud Stream nomenclature the implementation responsible for integration with the specific message broker is called binder. By default, Spring Cloud Stream provides binder implementations for Kafka and RabbitMQ. It is able to automatically detect and use a binder found on the classpath. Any middleware-specific settings can be overridden through external configuration properties in the form supported by Spring Boot, such as application arguments, environment variables, or just the application.yml file. To include support for RabbitMQ you should add the following dependency to the project.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

Now, our applications need to connect with one, shared instance of RabbitMQ broker. That’s why I run Docker image with RabbitMQ exposed outside on default 5672 port. It also launches a web dashboard available under address http://192.168.99.100:15672.

$ docker run -d --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

We need to override the default address of RabbitMQ for every Spring Boot application by setting property spring.rabbitmq.host to Docker machine IP 192.168.99.100.

spring:  
  rabbitmq:
    host: 192.168.99.100
    port: 5672

Implementing message-driven microservices

Spring Cloud Stream is built on top of the Spring Integration project. Spring Integration extends the Spring programming model to support the well-known Enterprise Integration Patterns (EIP). EIP defines a number of components that are typically used for orchestration in distributed systems. You have probably heard about patterns such as message channels, routers, aggregators, or endpoints. Let’s proceed to the implementation.
We begin from order-service, which is responsible for accepting orders, publishing them on shared topics, and then collecting asynchronous responses from downstream services. Here’s the @Service, which builds a message and publishes it to the remote topic using Source bean.

@Service
public class OrderSender {
  @Autowired
  private Source source;
  
  public boolean send(Order order) {
    return this.source.output().send(MessageBuilder.withPayload(order).build());
  }
}

That @Service is called by the controller, which exposes the HTTP endpoints for submitting new orders and getting order with status by id.

@RestController
public class OrderController {

   private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);
   
   private ObjectMapper mapper = new ObjectMapper();
   
   @Autowired
   OrderRepository repository;
   @Autowired
   OrderSender sender;   
   
   @PostMapping
   public Order process(@RequestBody Order order) throws JsonProcessingException {
      Order o = repository.add(order);
      LOGGER.info("Order saved: {}", mapper.writeValueAsString(order));
      boolean isSent = sender.send(o);
      LOGGER.info("Order sent: {}", mapper.writeValueAsString(Collections.singletonMap("isSent", isSent)));
      return o;
   }
   
   @GetMapping("/{id}")
   public Order findById(@PathVariable("id") Long id) {
      return repository.findById(id);
   }
   
}

Now, let’s take a closer look at the consumer side. The message sent by OrderSender bean from order-service is received by account-service and product-service. To receive the message from topic exchange, we just have to annotate the method that takes the Order object as a parameter with @StreamListener. We also have to define a target channel for the listener – in that case it is Processor.INPUT.

@SpringBootApplication
@EnableBinding(Processor.class)
public class OrderApplication {
   
   private static final Logger LOGGER = LoggerFactory.getLogger(OrderApplication.class);
   
   @Autowired
   OrderService service;
   
   public static void main(String[] args) {
      new SpringApplicationBuilder(OrderApplication.class).web(true).run(args);
   }
   
   @StreamListener(Processor.INPUT)
   public void receiveOrder(Order order) throws JsonProcessingException {
      LOGGER.info("Order received: {}", mapper.writeValueAsString(order));
      service.process(order);
   }
   
}

Received order is then processed by AccountService bean. Order may be accepted or rejected by account-service dependending on sufficient funds on customer’s account for order’s realization. The response with acceptance status is sent back to order-service via output channel invoked by the OrderSender bean.

@Service
public class AccountService {

   private static final Logger LOGGER = LoggerFactory.getLogger(AccountService.class);
   
   private ObjectMapper mapper = new ObjectMapper();
   
   @Autowired
   AccountRepository accountRepository;
   @Autowired
   OrderSender orderSender;
   
   public void process(final Order order) throws JsonProcessingException {
      LOGGER.info("Order processed: {}", mapper.writeValueAsString(order));
      List accounts =  accountRepository.findByCustomer(order.getCustomerId());
      Account account = accounts.get(0);
      LOGGER.info("Account found: {}", mapper.writeValueAsString(account));
      if (order.getPrice() <= account.getBalance()) {
         order.setStatus(OrderStatus.ACCEPTED);
         account.setBalance(account.getBalance() - order.getPrice());
      } else {
         order.setStatus(OrderStatus.REJECTED);
      }
      orderSender.send(order);
      LOGGER.info("Order response sent: {}", mapper.writeValueAsString(order));
   }
   
}

The last step is configuration. It is provided inside application.yml file. We have to properly define destinations for channels. While order-service is assigning orders-out destination to output channel, and orders-in destination to input channel, account-service and product-service do the opposite. It is logical, because messages sent by order-service via its output destination are received by consuming services via their input destinations. But it is still the same destination on a shared broker's exchange. Here are configuration settings of order-service.

spring: 
  cloud:  
    stream:
      bindings:
        output:
          destination: orders-out
        input:
          destination: orders-in
      rabbit:
        bindings:
          input:
            consumer:
              exchangeType: direct

Here's configuration provided for account-service and product-service.

spring:  
  cloud:  
    stream:
      bindings:
        output:
          destination: orders-in
        input:
          destination: orders-out
      rabbit:
        bindings:
          output:
            producer:
              exchangeType: direct
              routingKeyExpression: '"#"'

Finally, you can run our sample microservice. For now, we just need to run a single instance of each microservice. You can easily generate some test requests by running JUnit test class OrderControllerTest provided in my source code repository inside module order-service. This case is simple. In the next, we will study more advanced samples with multiple running instances of consuming services.

Scaling up

To scale up our Spring Cloud Stream applications we just need to launch additional instances of each microservice. They will still listen for the incoming messages on the same topic exchange as the currently running instances. After adding one instance of account-service and product-service we may send a test order. The result of that test won't be satisfactory for us... Why? A single order is received by all the running instances of every microservice. This is exactly how topic exchanges work - the message sent to the topic is received by all consumers, which are listening on that topic. Fortunately, Spring Cloud Stream is able to solve that problem by providing a solution called consumer group. It is responsible for guaranteeing that only one of the instances is expected to handle a given message if they are placed in a competing consumer relationship. The transformation to consumer group mechanism when running multiple instances of the service has been visualized on the following figure.

stream-2

Configuration of a consumer group mechanism is not very difficult. We just have to set a group parameter with the name of the group for the given destination. Here's the current binding configuration for account-service. The orders-in destination is a queue created for direct communication with order-service, so only orders-out is grouped using spring.cloud.stream.bindings..group property.

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: orders-in
        input:
          destination: orders-out
          group: account

Consumer group mechanisms is a concept taken from Apache Kafka, and implemented in Spring Cloud Stream also for RabbitMQ broker, which does not natively support it. So, I think it is pretty interesting how it is configured on RabbitMQ. If you run two instances of the service without setting group name on destination there are two bindings created for a single exchange (one binding per one instance) as shown in the picture below. Because two applications are listening on that exchange, there are four bindings assigned to that exchange in total.

spring-cloud-stream-testing-3

If you set a group name for the selected destination Spring Cloud Stream will create a single binding for all running instances of a given service. The name of the binding will be suffixed with the group name.

spring-cloud-stream-testing_11_06

Because, we have included spring-cloud-starter-sleuth to the project dependencies the same traceId header is sent between all the asynchronous requests exchanged during realization of single request incoming to the order-service POST endpoint. Thanks to that we can easily correlate all logs using this header using Elastic Stack (Kibana).

spring-cloud-stream-testing_11_05

Automated Testing with Spring Cloud Stream

You can easily test your microservice without connecting to a message broker. To achieve it you need to include spring-cloud-stream-test-support to your project dependencies. It contains the TestSupportBinder bean that lets you interact with the bound channels and inspect any messages sent and received by the application.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-test-support</artifactId>
  <scope>test</scope>
</dependency>

In the test class we need to declare MessageCollector bean, which is responsible for receiving messages retained by TestSupportBinder. Here's my test class from account-service. Using Processor bean I send test order to input channel. Then MessageCollector receives a message that is sent back to order-service via the output channel. Test method testAccepted creates order that should be accepted by account-service, while testRejected method sets too high an order price that results in rejecting the order.

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class OrderReceiverTest {

   private static final Logger LOGGER = LoggerFactory.getLogger(OrderReceiverTest.class);
   
   @Autowired
   private Processor processor;
   @Autowired
   private MessageCollector messageCollector;

   @Test
   @SuppressWarnings("unchecked")
   public void testAccepted() {
      Order o = new Order();
      o.setId(1L);
      o.setAccountId(1L);
      o.setCustomerId(1L);
      o.setPrice(500);
      o.setProductIds(Collections.singletonList(2L));
      processor.input().send(MessageBuilder.withPayload(o).build());
      Message received = (Message) messageCollector.forChannel(processor.output()).poll();
      LOGGER.info("Order response received: {}", received.getPayload());
      assertNotNull(received.getPayload());
      assertEquals(OrderStatus.ACCEPTED, received.getPayload().getStatus());
   }
   
   @Test
   @SuppressWarnings("unchecked")
   public void testRejected() {
      Order o = new Order();
      o.setId(1L);
      o.setAccountId(1L);
      o.setCustomerId(1L);
      o.setPrice(100000);
      o.setProductIds(Collections.singletonList(2L));
      processor.input().send(MessageBuilder.withPayload(o).build());
      Message received = (Message) messageCollector.forChannel(processor.output()).poll();
      LOGGER.info("Order response received: {}", received.getPayload());
      assertNotNull(received.getPayload());
      assertEquals(OrderStatus.REJECTED, received.getPayload().getStatus());
   }

}

Conclusion

Message-driven microservices are a good choice whenever you don't need an asynchronous response from your API. In this article, I have shown a sample use case of the publish/subscribe model in inter-service communication between your microservices. The source code is as usual available on GitHub (https://github.com/piomin/sample-message-driven-microservices.git). For other interesting examples of testing with Spring Cloud Stream library, also with Apache Kafka, you can refer to Chapter 11 in my book Mastering Spring Cloud (https://www.packtpub.com/application-development/mastering-spring-cloud).

The post Building and testing message-driven microservices using Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/feed/ 7 6678
Partitioning with Apache Kafka and Vertx https://piotrminkowski.com/2018/01/30/partitioning-with-apache-kafka-and-vert-x/ https://piotrminkowski.com/2018/01/30/partitioning-with-apache-kafka-and-vert-x/#respond Tue, 30 Jan 2018 10:46:29 +0000 https://piotrminkowski.wordpress.com/?p=6318 In this article, you will learn how to implement partitioning with Apache Kafka and Vertx toolkit. Apache Kafka is a distributed streaming platform. It also may act as a messaging system in your architecture. Traditional message brokers provide two models of communication: queuing and publish-subscribe (topics). Queues are used for point-to-point messaging, while topics allow […]

The post Partitioning with Apache Kafka and Vertx appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to implement partitioning with Apache Kafka and Vertx toolkit. Apache Kafka is a distributed streaming platform. It also may act as a messaging system in your architecture. Traditional message brokers provide two models of communication: queuing and publish-subscribe (topics). Queues are used for point-to-point messaging, while topics allow you to broadcast data to multiple target consumers. Kafka does not provide a queuing mechanism directly. However, it introduces the consumer group concept, which generalizes both queuing and publish-subscribe models. The consumer group mechanism guarantees that a single message would be processed by only one consumer that belongs to the given group. It is especially useful when you have more than one instance of your service, which listens for messages incoming to the topic. That feature makes your consumers behave as queuing clients within the same group.

Eclipse Vert.x is a lightweight and fast toolkit for building reactive applications on the JVM. I have already introduced that solution in some of my previous posts, for example Asynchronous Microservices with Vert.x. Vert.x does not force you to implement a reactive application. You may create a standard service, which processes the HTTP requests asynchronously in accordance with Asynchronous I/O concept.

The purpose of this article

The main purpose of this article is to show you the main features of Apache Kafka (partitioning), that may be useful when creating applications consuming messages. The Java client’s library choice is not a key point here. However, in my opinion, Vertx is an asynchronous, high-performance framework that perfectly matches Apache Kafka. It provides Vert.x Kafka client, which allows you to read and send messages from/to a Kafka cluster. Before we proceed to the sample, let’s first dive into the core abstraction of Kafka.

Kafka topic

I’m assuming you know what the topic is and what is its main role. Every message incoming to the topic goes to every subscriber. What is the main difference between Kafka and standard topics provided by other message brokers? Kafka’s topic is partitioned. Each partition is an ordered, immutable sequence of records. Every record can be uniquely identified within the partition by a sequential id number called the offset. The Kafka cluster retains all published records according to the configured retention period.

Consumers may subscribe to the whole topic or only to the selected partition. It can also control the offset from where it starts processing data. For example, it is able to reset offset in order to reprocess data from the past or just or skip ahead to the most recent record to consume only messages currently sent to the topic. Here’s the figure that illustrates a single partition structure with producers and consumers listening for the incoming data.

apache-kafka-partitioning-1

Sample architecture

Let me say some words about the sample system architecture. Its source code is available on GitHub (https://github.com/piomin/sample-vertx-kafka-messaging.git). In accordance with the principle that one picture speaks more than a thousand words, the diagram illustrating the architecture of our system is visible below. We have one topic created on Kafka platform, that consists of two partitions. There is one client application that exposes REST API allowing to send orders into the system and then forwarding them into the topic. The target partition is calculated based on the type of order. We may create orders with types SINGLE and MULTIPLE. There are also some applications that consume data from topics. First of them single-order-processor reads data from partition 0, the second multiple-order-processor from partition 1, and the last all-order-processor does not choose any partition.

apache-kafka-partitioning-2

Running Apache Kafka

To run Apache Kafka on the local machine we may use its Docker image. The image shared by Spotify also starts the ZooKeeper server, which is used by Kafka. If you run Docker on Windows the default address of its virtual machine is 192.168.99.100.

$ docker run -d --name kafka -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=192.168.99.100 --env ADVERTISED_PORT=9092 spotify/kafka

However, that option assumes the topics would be automatically created during application startup. I’ve got some problems with it while creating a multi-partitions topic. There is also another image ches/kafka, which requires starting ZooKeeper separately, but provides Kafka client interface.

$ docker run -d --name zookeeper -p 2181:2181 zookeeper
$ docker run -d --name kafka -p 9092:9092 -p 7203:7203 --network kafka-net --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env ZOOKEEPER_IP=192.168.99.100 ches/kafka

Finally, we can run ches/kafka container in client mode and then create topic orders-out with two partitions.


docker run --rm --network kafka-net ches/kafka kafka-topics.sh --create --topic orders-out --replication-factor 1 --partitions 2 --zookeeper 192.168.99.100:2181
Created topic "orders-out".

Building producer application with Vertx

First, we need to include Maven dependencies to enable Vert.x framework for the application. If the application exposes RESTful HTTP API you should include vertx-web. Library vertx-kafka-client has to be included to all the sample modules.

To start Vert.x as Java application we have to create verticle by extending AbstractVerticle. Then the verticle needs to be deployed in the main method using Vertx object. For more details about Vert.x and verticles concept you may refer to one of my previous article mentioned in the preface.

public class OrderVerticle extends AbstractVerticle {
   public static void main(String[] args) {
      Vertx vertx = Vertx.vertx();
      vertx.deployVerticle(new OrderVerticle());
   }
}

The next step is to define producer using KafkaProducer interface. We have to provide connection settings and serializer implementation class. You can choose between various built-in serializer implemementations. The most suitable for me was JsonObjectSerializer, which requires JsonObject as an input parameter.

Properties config = new Properties();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonObjectSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "1");
KafkaProducer producer = KafkaProducer.create(vertx, config);

The producer is invoked inside the POST method route definition. It returns an asynchronous response with a status after sending a message to the topic. The message is created using KafkaProducerRecord interface. It takes the topic’s name, request object, and partition number as the parameters. As you may see in the fragment of code below, partition number is calculated on the basis order type (o.getType().ordinal()).

Router router = Router.router(vertx);
router.route("/order/*").handler(ResponseContentTypeHandler.create());
router.route(HttpMethod.POST, "/order").handler(BodyHandler.create());
router.post("/order").produces("application/json").handler(rc -> {
   Order o = Json.decodeValue(rc.getBodyAsString(), Order.class);
   KafkaProducerRecord record = KafkaProducerRecord.create("orders", null, rc.getBodyAsJson(), o.getType().ordinal());
   producer.write(record, done -> {
      if (done.succeeded()) {
         RecordMetadata recordMetadata = done.result();
         LOGGER.info("Record sent: msg={}, destination={}, partition={}, offset={}", record.value(), recordMetadata.getTopic(), recordMetadata.getPartition(), recordMetadata.getOffset());
         o.setId(recordMetadata.getOffset());
         o.setStatus(OrderStatus.PROCESSING);
      } else {
         Throwable t = done.cause();
         LOGGER.error("Error sent to topic: {}", t.getMessage());
         o.setStatus(OrderStatus.REJECTED);
      }
      rc.response().end(Json.encodePrettily(o));
   });
});
vertx.createHttpServer().requestHandler(router::accept).listen(8090);

Partitioning on consumer applications with Vertx

The consumer configuration is very similar to that for producer. We also have to set connection settings and class using for deserialization. There is one interesting setting, which has been defined for the consumer in the fragment of code visible below. It is auto.offset.reset (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG). It sets the initial offset in Kafka for the customer during initialization. If you would like to read all records from the beginning of stream use value earliest. If you would like to processes only the newest records (received after application startup) set that property to latest. Because in our case Kafka acts as a message broker, it is set to latest.

Properties config = new Properties();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer consumer = KafkaConsumer.create(vertx, config);

As you probably remember we have three different application that subscribes to the topic. The first of them, implemented under the module all-order-processor consumes all the events incoming to the topic. This implementation is relatively the simplest. We only need to invoke subscribe method and pass the name of the topic as a parameter. Then every incoming message is processed by handler method.

consumer.subscribe("orders-out", ar -> {
   if (ar.succeeded()) {
      LOGGER.info("Subscribed");
   } else {
      LOGGER.error("Could not subscribe: err={}", ar.cause().getMessage());
   }
});

consumer.handler(record -> {
   LOGGER.info("Processing: key={}, value={}, partition={}, offset={}", record.key(), record.value(), 
record.partition(), record.offset());
   Order order = Json.decodeValue(record.value(), Order.class);
   order.setStatus(OrderStatus.DONE);
   LOGGER.info("Order processed: id={}, price={}", order.getId(), order.getPrice());
});

The implementation of consuming method for the other applications is a little more complicated. Besides defining target topic, every consumer can ask for a specific partition. The application multiple-order-processor subscribes to partition 1, while multiple-order-processor to partition 0.

TopicPartition tp = new TopicPartition().setPartition(1).setTopic("orders-out");
consumer.assign(tp, ar -> {
   if (ar.succeeded()) {
      LOGGER.info("Subscribed");
      consumer.assignment(done1 -> {
         if (done1.succeeded()) {
            for (TopicPartition topicPartition : done1.result()) {
               LOGGER.info("Partition: topic={}, number={}", topicPartition.getTopic(), topicPartition.getPartition());
            }
         } else {
            LOGGER.error("Could not assign partition: err={}", done1.cause().getMessage());
         }
      });
   } else {
      LOGGER.error("Could not subscribe: err={}", ar.cause().getMessage());
   }
});

The implementation of handle method inside multiple-order-processor is pretty interesting. If it receives an order with a non-empty field relatedOrderId it tries to find it in the historical records stored in the topic. It may be achieved by calling seek method on KafkaConsumer.

consumer.handler(record -> {
   LOGGER.info("Processing: key={}, value={}, partition={}, offset={}", record.key(), record.value(), record.partition(), record.offset());
   Order order = Json.decodeValue(record.value(), Order.class);
   if (ordersWaiting.containsKey(record.offset())) {
      LOGGER.info("Related order found: id={}, price={}", order.getId(), order.getPrice());
      LOGGER.info("Current price: price={}", order.getPrice() + ordersWaiting.get(record.offset()).getPrice());
      consumer.seekToEnd(tp);
   }

   if (order.getRelatedOrderId() != null && !ordersWaiting.containsKey(order.getRelatedOrderId())) {
      ordersWaiting.put(order.getRelatedOrderId(), order);
      consumer.seek(tp, order.getRelatedOrderId());
   }
});

Apache Kafka partitioning testing

Now it is time to launch our applications. You may run the main classes from your IDE or build the whole project using mvn clean install command and then run it with java -jar. Also run two instances of all-order-processor in order to check out how a consumer groups mechanism works in practice.

Let’s send some test requests to the order-service in the following sequence.

$ curl -H "Content-Type: application/json" -X POST -d '{"type":"SINGLE","status":"NEW","price":200}' http://localhost:8090/order
{"id":0,"type":"SINGLE","status":"PROCESSING","price":200}
$ curl -H "Content-Type: application/json" -X POST -d '{"type":"SINGLE","status":"NEW","price":300}' http://localhost:8090/order
{"id":1,"type":"SINGLE","status":"PROCESSING","price":300}
$ curl -H "Content-Type: application/json" -X POST -d '{"type":"MULTIPLE","status":"NEW","price":400}' http://localhost:8090/order
{"id":0,"type":"MULTIPLE","status":"PROCESSING","price":400}
$ curl -H "Content-Type: application/json" -X POST -d '{"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId" :0}' http://localhost:8090/order
{"id":1,"type":"MULTIPLE","status":"PROCESSING","price":500}

Here’s log from producer application.

2018-01-30 11:08:48 [INFO ]  Record sent: msg={"type":"SINGLE","status":"NEW","price":200}, destination=orders-out, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Record sent: msg={"type":"SINGLE","status":"NEW","price":300}, destination=orders-out, partition=0, offset=1
2018-01-30 11:09:08 [INFO ]  Record sent: msg={"type":"MULTIPLE","status":"NEW","price":400}, destination=orders-out, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Record sent: msg={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, destination=orders-out, partition=1, offset=1

Here’s log from single-order-processor. It has processed only messages from partition 0.

2018-01-30 11:08:48 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":200}, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":300}, partition=0, offset=1

Here’s log from multiple-order-processor. It has processed only messages from partition 1.

2018-01-30 11:09:08 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":400}, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, partition=1, offset=1

Here’s log from first instance of all-order-processor.

2018-01-30 11:08:48 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":200}, partition=0, offset=0
2018-01-30 11:08:57 [INFO ]  Processing: key=null, value={"type":"SINGLE","status":"NEW","price":300}, partition=0, offset=1

Here’s log from second instance of all-order-processor. It may be a little bit surprising for you. But, if you run two instances of consumer, which listens for the whole topic each instance would process message from the single partition.

2018-01-30 11:09:08 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":400}, partition=1, offset=0
2018-01-30 11:09:27 [INFO ]  Processing: key=null, value={"type":"MULTIPLE","status":"NEW","price":500,"relatedOrderId":0}, partition=1, offset=1

Summary

In this article I was trying to give you a little bit of messaging with Apache Kafka. Such concepts like consumer groups or partitioning are something that makes Apache Kafka different from traditional messaging solutions. It is a widely adopted product, which can act as storage, messaging system, or stream processor. Together with the popular JVM-based toolkit Vertx, it may be a really powerful, fast, and lightweight solution for your applications that exchange messages. The key concepts introduced by Kafka has been adopted by Spring Cloud Stream, which makes them an opinionated choice for creating messaging microservices.

The post Partitioning with Apache Kafka and Vertx appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/01/30/partitioning-with-apache-kafka-and-vert-x/feed/ 0 6318