Apache Kafka Archives - Piotr's TechBlog https://piotrminkowski.com/tag/apache-kafka/ Java, Spring, Kotlin, microservices, Kubernetes, containers Mon, 11 Mar 2024 13:10:02 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 Apache Kafka Archives - Piotr's TechBlog https://piotrminkowski.com/tag/apache-kafka/ 32 32 181738725 Kafka Offset with Spring Boot https://piotrminkowski.com/2024/03/11/kafka-offset-with-spring-boot/ https://piotrminkowski.com/2024/03/11/kafka-offset-with-spring-boot/#comments Mon, 11 Mar 2024 13:09:59 +0000 https://piotrminkowski.com/?p=15064 In this article, you will learn how to manage Kafka consumer offset with Spring Boot and Spring Kafka. An inspiration for preparing this article was the feedback I received after publishing the post describing concurrency with Kafka and Spring Boot. You were asking me questions related not only to concurrency but also to the consumer […]

The post Kafka Offset with Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to manage Kafka consumer offset with Spring Boot and Spring Kafka. An inspiration for preparing this article was the feedback I received after publishing the post describing concurrency with Kafka and Spring Boot. You were asking me questions related not only to concurrency but also to the consumer offset committing process. In the previous article, I focused mostly on showing that the way how the app handles Kafka messages may impact the overall performance of our system. I didn’t consider things like message duplicates or losing messages on the consumer side. Today, we are going to discuss exactly those topics.

If you are interested in Kafka and Spring Boot you can find several articles about it on my blog. Except for the already mentioned post about concurrency, you can read e.g. about Kafka transactions here. On the other hand, to read about microservices with Kafka and Spring Boot refer to the following article.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Introduction

Before we start, we need to clarify some important things related to committing offsets with Spring Kafka. First of all, by default, Spring Kafka sets the consumer enable.auto.commit property to false. It means that the framework, not Kafka, is responsible for committing an offset. Of course, we can change the default behavior by setting that property to true. By the way, it was the default approach before Spring Kafka 2.3. 

Once we stay with Kafka auto committing disabled, we can leverage 7 different commit strategies provided by Spring Kafka. Today, we won’t analyze all of them, but just the most significant. The default strategy is BATCH. In order to set the different strategy, we need to override the AckMode e.g. by setting a value of the property spring.kafka.listener.ack-mode in Spring Boot application properties. However, firstly, let’s focus on the BATCH mode.

Sample Spring Boot Kafka Apps

In order to test the offset committing with Spring Kafka, we will create two simple apps: producer and consumer. Producer sends a defined number of messages to the topic, while the consumer receives and processes them. Here’s the producer @RestController implementation. It allows us to send a defined number of messages to the transactions topic on demand:

@RestController
public class TransactionsController {

   private static final Logger LOG = LoggerFactory
            .getLogger(TransactionsController.class);

   long id = 1;
   long groupId = 1;
   KafkaTemplate<Long, Order> kafkaTemplate;

   @PostMapping("/transactions")
   public void generateAndSendMessages(@RequestBody InputParameters inputParameters) {
      for (long i = 0; i < inputParameters.getNumberOfMessages(); i++) {
         Order o = new Order(id++, i+1, i+2, 1000, "NEW", groupId);
         CompletableFuture<SendResult<Long, Order>> result =
                 kafkaTemplate.send("transactions", o.getId(), o);
         result.whenComplete((sr, ex) ->
                 LOG.info("Sent({}): {}", sr.getProducerRecord().key(), sr.getProducerRecord().value()));
      }
      groupId++;
   }

}

Here are the producer app configuration properties. We need to set the address of a Kafka broker, and serializer classes for a key (Long) and a value (JSON format).

spring:
  application.name: producer
  kafka:
    bootstrap-servers: ${KAFKA_URL}
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

We can trigger the process of sending messages, by calling the POST /transactions endpoint as shown below:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d '{"numberOfMessages":10}'

Here’s the consumer app listener bean implementation. As you see, it is very simple. It just receives and prints the messages. We are sleeping the thread for 10 seconds, just to be able to easily check the offset on the Kafka topic during the test.

@Service
public class Listener {

   private static final Logger LOG = LoggerFactory
          .getLogger(NoTransactionsListener.class);

   @KafkaListener(
          id = "transactions",
          topics = "transactions",
          groupId = "a"
   )
   public void listen(@Payload Order order,
                      @Header(KafkaHeaders.OFFSET) Long offset,
                      @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) throws InterruptedException {
      LOG.info("[partition={},offset={}] Starting: {}", partition, offset, order);
      Thread.sleep(10000L);
      LOG.info("[partition={},offset={}] Finished: {}", partition, offset, order);
   }

}

In order to see what exactly happens in the consumer app, we need to increase the default logging level for Spring Kafka to DEBUG. There are also some other properties related to the serialization and deserialization of messages in the application properties. Here’s the whole application.yml file for the consumer app:

spring:
  application.name: consumer
  output.ansi.enabled: ALWAYS
  kafka:
    bootstrap-servers: ${KAFKA_URL:localhost}:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"

logging.level:
  org.springframework.kafka: debug

Understanding How Spring Kafka Commits the Offset

Then, let’s start both our apps by executing the Maven commands visible below. I assume that you already have the Kafka broker running. Once the consumer is connected to the transactions topic, we can send 10 messages by calling the already-mentioned POST /transactions endpoint. After that, we will switch the consumer app logs. We can see all significant information related to the offset committing.

# build the whole project
$ mvn clean package

# run the consumer app
$ cd consumer
$ mvn spring-boot:run

# run the producer app
$ cd producer
$ mvn spring-boot:run

So, here are the logs from our test. I highlighted the most important parts. Of course, your results may differ slightly, but the rules are the same. First of all, the consumer receives a batch of messages. In that case, there are 2 messages, but for example, it consumes 7 in one step in the next partition. Without detailed logs, you won’t even be aware of how it behaves, since the message listener processes a message after a message. However, the offset commit action is performed after processing all the consumed messages. That’s because we have the AckMode set to BATCH.

spring-kafka-offset-batch

Of course, it doesn’t have any impact on the app… as long as it is running. In case if not a graceful restart or crash occurs in the time between starting processing of batch messages and offset commit action it may cause some problems. Don’t get me wrong – it’s a standard situation that results in message duplicates on the consumer side. So, now our app consumes 7 messages in a batch. Let’s stop it during batch processing as shown below. By the way, with a graceful shutdown, Spring Kafka waits until the last message in the batch is processed. Therefore, I simulated an immediate stop with the SIGKILL option for the testing purposes.

spring-kafka-offset-batch-kill

The consumer offset has not been committed. We can verify it by checking out the current value of the consumer offset on the 1 partition. You can compare that value with the values highlighted in the logs above.

Then, let’s start our consumer app once again. The app starts reading messages from the latest committed offset for each partition. Consequently, it processes several messages already processed previously, before killing the consumer instance. As you see, the consumer app is processing orders with the 3, 5, 6, 8, and 10 id once again. We need to take such situations into account during the business logic implementation. After processing the last message in batch, Spring Kafka commits the offset.

spring-kafka-offset-batch-duplicates

Finally, everything works fine. There is no customer lag on any partition.

Using the RECORD Mode to Commit Offset

In the next step, we will compare a similar situation with the AckMode set to RECORD. According to the Spring Kafka docs the RECORD mode “commits the offset when the listener returns after processing the record”. In order to enable the RECORD mode, we need to set the following in the application.yml file:

spring.kafka.listener.ack-mode: RECORD

Then, we have to restart the consumer app. After that, we can trigger the process of sending messages once again, by calling the POST /transactions endpoint exposed by the producer app:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d '{\"numberOfMessages\":10}'

Let’s switch to the logs. As you see, each time after processing a single record by the @KafkaListener method Spring Kafka commits the offset. I guess that some of you assumed that this was the default behavior (not the BATCH mode) 🙂 That approach decreases the potential number of duplicate messages after the restart, but on the other hand, impacts the overall performance of the consumer.

spring-kafka-offset-record

The latest committed customer offset visible in the logs is 8. So, if we switch to the GUI client we can verify that the current offset there has the same value.

Graceful Shutdown

Although our app commits the offset each time after processing a single record, in the graceful shutdown Spring Boot waits until the whole batch is processed. As you see, I initiated the shutdown procedure at 15:12:41, but the container performed a shutdown after a 30-second timeout. That’s because I included 10 seconds of thread sleep in the processing method. It results in the total time of processing the batch of messages higher than 30 seconds.

spring-kafka-offset-batch-shutdown

However, we can change that behavior. We need to set the spring.kafka.listener.immediate-stop property to true. That property decides whether the container stops after the current record is processed or after all the records from the previous poll are processed.

spring.kafka.listener.immediate-stop: true

After restarting the consumer app we need to take a look at the logs once again. The Spring container starts a shutdown procedure just after committing the offset after processing the last record.

Spring Kafka Offset and Concurrency

Processing Messages with the Custom Thread Pool

Finally, the last scenario in our article. Let’s consider the case when we are using the custom thread to handle messages received by the @KafkaListener method. In order to do that, we can define the ExecutorService object. Once the listenAsync() method receives the message it delegates processing to the Processor bean by calling its process() method using the ExecutorService object.

@Service
public class Listener {

   private static final Logger LOG = LoggerFactory
          .getLogger(Listener.class);

   ExecutorService executorService = Executors.newFixedThreadPool(30);

   @Autowired
   private Processor processor;

   @KafkaListener(
          id = "transactions-async",
          topics = "transactions-async",
          groupId = "a"
   )
   public void listenAsync(@Payload Order order,
                     @Header(KafkaHeaders.OFFSET) Long offset,
                     @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
      LOG.info("[partition={},offset={}] Starting Async: {}", partition, offset, order);
      executorService.submit(() -> processor.process(order));
   }
}

In the Processor bean, we are sleeping the thread for 10 seconds for testing purposes. The process() method doesn’t do anything important, it just prints the log at the start and before finishing.

@Service
public class Processor {

   private static final Logger LOG = LoggerFactory
          .getLogger(Listener.class);

   public void process(Order order) {
      LOG.info("Processing: {}", order.getId());
      try {
         Thread.sleep(10000L);
      } catch (InterruptedException e) {
         throw new RuntimeException(e);
      }
      LOG.info("Finished: {}", order.getId());
   }

}

Let’s analyze what will happen after sending some message to such a consumer. This time we are using the transaction-async topic. By default, Spring Kafka commits the offset after processing the whole batch of 4 received messages. However, it happens almost immediately after receiving the messages, because we are delegating the further processing to another thread. The asynchronous method finishes processing after 10 seconds. If your app crashes during those 10 seconds, it will result in losing messages. They won’t be processed by the new instance of the app, because the offset has already been committed before message processing has been finished.

Enable Manual Offset Commit

Once again, it is a normal situation, that we can lose messages with the Kafka consumer. However, we can handle such cases slightly differently. Instead of relying on the container-managed offset commitment, we can switch to the manual mode. First of all, let’s add the following property to the Spring Boot application.yml file:

spring.kafka.listener.ack-mode: MANUAL_IMMEDIATE

Then we need to leverage the Acknowledgment interface to take a control over the offset commitment process inside the listener. As you see, we have to include such an interface to the @KafkaListener method parameters. After that, we can pass it to the process() method running in the different thread.

@Service
public class Listener {

   private static final Logger LOG = LoggerFactory
          .getLogger(Listener.class);

   ExecutorService executorService = Executors.newFixedThreadPool(30);

   @Autowired
   private Processor processor;

   @KafkaListener(
          id = "transactions-async",           
          topics = "transactions-async",
          groupId = "a"
   )
   public void listenAsync(@Payload Order order,
                     Acknowledgment acknowledgment,
                     @Header(KafkaHeaders.OFFSET) Long offset,
                     @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
      LOG.info("[partition={},offset={}] Starting Async: {}", partition, offset, order);
      executorService.submit(() -> processor.process(order, acknowledgment));
   }
}

With the acknowledge() provided by the Acknowledgment interface we can manually commit the offset in the selected location in the code. Here, we are making a commit at the of the whole method.

@Service
public class Processor {

   private static final Logger LOG = LoggerFactory
          .getLogger(Listener.class);

   public void process(Order order, Acknowledgment acknowledgment) {
      LOG.info("Processing: {}", order.getId());
      try {
         Thread.sleep(10000L);
      } catch (InterruptedException e) {
         throw new RuntimeException(e);
      }
      LOG.info("Finished: {}", order.getId());
      acknowledgment.acknowledge();
   }

}

Let’s switch to the consumer app logs once again. As you the offset commit happens almost immediately after processing the message. By the way, the MANUAL (instead of MANUAL_IMMEDIATE) AckMode will wait with the commit until the whole batch records will be processed. Another thing worth mentioning here is a possibility of out-of-order commit. It is disabled by default for the Spring Boot app. In order to enable it we need to set the spring.kafka.listener.async-acks property to true. If you want to test such a scenario by yourself you can increase the number of messages sent by the producer with the numberOfMessages field e.g. to 100. Then verify the consumer lag with or without the asyncAcks property.

Finally, let’s verify the current committed offset for all the partitions using the GUI client.

Final Thoughts

Kafka consumer offset is a very interesting topic. If you want to understand Kafka, you first need to understand how consumers commit the offset on partitions. In this article, I focused on showing you how to switch between different acknowledgment modes with Spring Kafka and how impacts on your app.

The post Kafka Offset with Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2024/03/11/kafka-offset-with-spring-boot/feed/ 4 15064
Apache Kafka on Kubernetes with Strimzi https://piotrminkowski.com/2023/11/06/apache-kafka-on-kubernetes-with-strimzi/ https://piotrminkowski.com/2023/11/06/apache-kafka-on-kubernetes-with-strimzi/#comments Mon, 06 Nov 2023 08:49:30 +0000 https://piotrminkowski.com/?p=14613 In this article, you will learn how to install and manage Apache Kafka on Kubernetes with Strimzi. The Strimzi operator lets us declaratively define and configure Kafka clusters, and several other components like Kafka Connect, Mirror Maker, or Cruise Control. Of course, it’s not the only way to install Kafka on Kubernetes. As an alternative, […]

The post Apache Kafka on Kubernetes with Strimzi appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to install and manage Apache Kafka on Kubernetes with Strimzi. The Strimzi operator lets us declaratively define and configure Kafka clusters, and several other components like Kafka Connect, Mirror Maker, or Cruise Control. Of course, it’s not the only way to install Kafka on Kubernetes. As an alternative, we can use the Bitnami Helm chart available here. In comparison to that approach, Strimzi simplifies the creation of additional components. We will analyze it on the example of the Cruise Control tool.

You can find many other articles about Apache Kafka on my blog. For example, to read about concurrency with Spring Kafka please refer to the following post. There is also an article about Kafka transactions available here.

Prerequisites

In order to proceed with the exercise, you need to have a Kubernetes cluster. This cluster should have at least three worker nodes since I’m going to show you the approach with Kafka brokers spread across several nodes. We can easily simulate multiple Kubernetes nodes locally with Kind. You need to install the kind CLI tool and start Docker on your laptop. Here’s the Kind configuration manifest containing a definition of a single control plane and 4 worker nodes:

kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker
- role: worker

Then, we need to create the Kubernetes cluster based on the manifest visible above with the following kind command:

$ kind create cluster --name c1 --config cluster.yaml

The name of our Kind cluster is c1. It corresponds to the kind-c1 Kubernetes context, which is automatically set as default after creating the cluster. After that, we can display a list of Kubernetes nodes using the following kubectl command:

$ kubectl get node
NAME               STATUS   ROLES           AGE  VERSION
c1-control-plane   Ready    control-plane   1m   v1.27.3
c1-worker          Ready    <none>          1m   v1.27.3
c1-worker2         Ready    <none>          1m   v1.27.3
c1-worker3         Ready    <none>          1m   v1.27.3
c1-worker4         Ready    <none>          1m   v1.27.3

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, go to the kafka directory. There are two Spring Boot apps inside the producer and consumer directories. The required Kubernetes manifests are available inside the k8s directory. You can apply them with kubectl or using the Skaffold CLI tool. The repository is already configured to work with Skaffold and Kind. To proceed with the exercise just follow my instructions in the next sections.

Architecture

Let’s analyze our main goals in this exercise. Of course, we want to run a Kafka cluster on Kubernetes as simple as possible. There are several requirements for the cluster:

  1. It should automatically expose broker metrics in the Prometheus format. Then we will use Prometheus mechanisms to get the metrics and store them for visualization.
  2. It should consist of at least 3 brokers. Each broker has to run on a different Kubernetes worker node.
  3. Our Kafka needs to work in the Zookeeper-less mode. Therefore, we need to enable the KRaft protocol between the brokers.
  4. Once we scale up the Kafka cluster, we must automatically rebalance it to reassign partition replicas to the new broker. In order to do that, we will use the Cruise Control support in Strimzi.

Here’s the diagram that visualizes the described architecture. We will also run two simple Spring Boot apps on Kubernetes that connect the Kafka cluster and use it to send/receive messages.

kafka-on-kubernetes-arch

1. Install Monitoring Stack on Kubernetes

In the first step, we will install the monitoring on our Kubernetes cluster. We are going to use the kube-prometheus-stack Helm chart for that. It provides preconfigured instances of Prometheus and Grafana. It also comes with several CRD objects that allow us to easily customize monitoring mechanisms according to our needs. Let’s add the following Helm repository:

$ helm repo add prometheus-community \
    https://prometheus-community.github.io/helm-charts

Then, we can install the chart in the monitoring namespace. We can leave the default configuration.

$ helm install kube-prometheus-stack \
    prometheus-community/kube-prometheus-stack \
    --version 52.1.0 -n monitoring --create-namespace

2. Install Strimzi Operator on Kubernetes

In the next step, we will install the Strimzi operator on Kubernetes using Helm chart. The same as before we need to add the Helm repository:

$ helm repo add strimzi https://strimzi.io/charts

Then, we can proceed to the installation. This time we will override some configuration settings. The Strimzi Helm chart comes with a set of Grafana dashboards to visualize metrics exported by Kafka brokers and some other components managed by Strimzi. We place those dashboards inside the monitoring namespace. By default, the Strimzi chart doesn’t add the dashboards, so we also need to enable that feature in the values YAML file. That’s not all. Because we want to run Kafka in the KRaft mode, we need to enable it using feature gates. Enabling the UseKRaft feature gate requires the KafkaNodePools feature gate to be enabled as well. Then when we deploy a Kafka cluster in KRaft mode, we also must use the KafkaNodePool resources. Here’s the full list of overridden Helm chart values:

dashboards:
  enabled: true
  namespace: monitoring
featureGates: +UseKRaft,+KafkaNodePools,+UnidirectionalTopicOperator

Finally, let’s install the operator in the strimzi namespace using the following command:

$ helm install strimzi-kafka-operator strimzi/strimzi-kafka-operator \
    --version 0.38.0 \
    -n strimzi --create-namespace \
    -f strimzi-values.yaml

3. Run Kafka in the KRaft Mode

In the current version of Strimzi KRaft mode support is still in the alpha phase. This will probably change soon but for now, we have to deal with some inconveniences. In the previous section, we enabled three feature gates required to run Kafka in KRaft mode. Thanks to that we can finally define our Kafka cluster. In the first step, we need to create a node pool. This new Strimzi object is responsible for configuring brokers and controllers in the cluster. Controllers are responsible for coordinating operations and maintaining the cluster’s state. Fortunately, a single node in the poll can act as a controller and a broker at the same time.

Let’s create the KafkaNodePool object for our cluster. As you see it defines two roles: broker and controller (1). We can also configure storage for the cluster members (2). One of our goals is to avoid sharing the same Kubernetes node between Kafka brokers. Therefore, we will define the podAntiAffinity section (3). Setting the topologyKey to kubernetes.io/hostname indicates that the selected pods are not scheduled on nodes with the same hostname (4).

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: dual-role
  namespace: strimzi
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles: # (1)
    - controller
    - broker
  storage: # (2)
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 20Gi
        deleteClaim: false
  template:
    pod:
      affinity:
        podAntiAffinity: # (3)
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: strimzi.io/name
                    operator: In
                    values:
                      - my-cluster-kafka
              topologyKey: "kubernetes.io/hostname" # (4)

Once we create a node pool, we can proceed to the Kafka object creation. We need to enable Kraft mode and node pools for the particular cluster by annotating it with strimzi.io/kraft and strimzi.io/node-pools (1). The sections like storage (2) or zookeeper (5) are not used in the KRaft mode but are still required by the CRD. We should also configure the cluster metrics exporter (3) and enable the Cruise Control component (4). Of course, our cluster is exposing API for the client connection under the 9092 port.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: strimzi
  annotations: # (1)
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: '3.6'
    storage: # (2)
      type: persistent-claim
      size: 5Gi
      deleteClaim: true
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    version: 3.6.0
    replicas: 3
    metricsConfig: # (3)
      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef:
          name: kafka-metrics
          key: kafka-metrics-config.yml
  entityOperator:
    topicOperator: {}
    userOperator: {}
  cruiseControl: {} # (4)
  # (5)
  zookeeper:
    storage:
      type: persistent-claim
      deleteClaim: true
      size: 2Gi
    replicas: 3

The metricsConfig section in the Kafka object took the ConfigMap as the configuration source. This ConfigMap contains a single kafka-metrics-config.yml entry with the Prometheus rules definition.

kind: ConfigMap
apiVersion: v1
metadata:
  name: kafka-metrics
  namespace: strimzi
  labels:
    app: strimzi
data:
  kafka-metrics-config.yml: |
    lowercaseOutputName: true
    rules:
    - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
      name: kafka_server_$1_$2
      type: GAUGE
      labels:
        clientId: "$3"
        topic: "$4"
        partition: "$5"
    - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
      name: kafka_server_$1_$2
      type: GAUGE
      labels:
        clientId: "$3"
        broker: "$4:$5"
    - pattern: kafka.server<type=(.+), cipher=(.+), protocol=(.+), listener=(.+), networkProcessor=(.+)><>connections
      name: kafka_server_$1_connections_tls_info
      type: GAUGE
      labels:
        cipher: "$2"
        protocol: "$3"
        listener: "$4"
        networkProcessor: "$5"
    - pattern: kafka.server<type=(.+), clientSoftwareName=(.+), clientSoftwareVersion=(.+), listener=(.+), networkProcessor=(.+)><>connections
      name: kafka_server_$1_connections_software
      type: GAUGE
      labels:
        clientSoftwareName: "$2"
        clientSoftwareVersion: "$3"
        listener: "$4"
        networkProcessor: "$5"
    - pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+):"
      name: kafka_server_$1_$4
      type: GAUGE
      labels:
        listener: "$2"
        networkProcessor: "$3"
    - pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+)
      name: kafka_server_$1_$4
      type: GAUGE
      labels:
        listener: "$2"
        networkProcessor: "$3"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>MeanRate
      name: kafka_$1_$2_$3_percent
      type: GAUGE
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>Value
      name: kafka_$1_$2_$3_percent
      type: GAUGE
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*, (.+)=(.+)><>Value
      name: kafka_$1_$2_$3_percent
      type: GAUGE
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        "$6": "$7"
        quantile: "0.$8"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        quantile: "0.$6"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        quantile: "0.$4"
    - pattern: "kafka.server<type=raft-metrics><>(.+-total|.+-max):"
      name: kafka_server_raftmetrics_$1
      type: COUNTER
    - pattern: "kafka.server<type=raft-metrics><>(.+):"
      name: kafka_server_raftmetrics_$1
      type: GAUGE
    - pattern: "kafka.server<type=raft-channel-metrics><>(.+-total|.+-max):"
      name: kafka_server_raftchannelmetrics_$1
      type: COUNTER
    - pattern: "kafka.server<type=raft-channel-metrics><>(.+):"
      name: kafka_server_raftchannelmetrics_$1
      type: GAUGE
    - pattern: "kafka.server<type=broker-metadata-metrics><>(.+):"
      name: kafka_server_brokermetadatametrics_$1
      type: GAUGE

4. Interacting with Kafka on Kubernetes

Once we apply the KafkaNodePool and Kafka objects to the Kubernetes cluster, Strimzi starts provisioning. As a result, you should see the broker pods, a single pod related to Cruise Control, and a metrics exporter pod. Each Kafka broker pod is running on a different Kubernetes node:

Clients can connect Kafka using the my-cluster-kafka-bootstrap Service under the 9092 port:

$ kubectl get svc
NAME                         TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                                        AGE
my-cluster-cruise-control    ClusterIP   10.96.108.204   <none>        9090/TCP                                       4m10s
my-cluster-kafka-bootstrap   ClusterIP   10.96.155.136   <none>        9091/TCP,9092/TCP,9093/TCP                     4m59s
my-cluster-kafka-brokers     ClusterIP   None            <none>        9090/TCP,9091/TCP,8443/TCP,9092/TCP,9093/TCP   4m59s

In the next step, we will deploy our two apps for producing and consuming messages. The producer app sends one message per second to the target topic:

@SpringBootApplication
@EnableScheduling
public class KafkaProducer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaProducer.class);

   public static void main(String[] args) {
      SpringApplication.run(KafkaProducer.class, args);
   }

   AtomicLong id = new AtomicLong();
   @Autowired
   KafkaTemplate<Long, Info> template;

   @Value("${POD:kafka-producer}")
   private String pod;
   @Value("${NAMESPACE:empty}")
   private String namespace;
   @Value("${CLUSTER:localhost}")
   private String cluster;
   @Value("${TOPIC:test}")
   private String topic;

   @Scheduled(fixedRate = 1000)
   public void send() {
      Info info = new Info(id.incrementAndGet(), 
                           pod, namespace, cluster, "HELLO");
      CompletableFuture<SendResult<Long, Info>> result = template
         .send(topic, info.getId(), info);
      result.whenComplete((sr, ex) ->
         LOG.info("Sent({}): {}", sr.getProducerRecord().key(), 
         sr.getProducerRecord().value()));
   }
}

Here’s the Deployment manifest for the producer app:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
      - name: producer
        image: piomin/producer
        resources:
          requests:
            memory: 200Mi
            cpu: 100m
        ports:
        - containerPort: 8080
        env:
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap
          - name: CLUSTER
            value: c1
          - name: TOPIC
            value: test-1
          - name: POD
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
          - name: NAMESPACE
            valueFrom:
              fieldRef:
                fieldPath: metadata.namespace

Before running the app we can create the test-1 topic with the Strimzi CRD:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: test-1
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 12
  replicas: 3
  config:
    retention.ms: 7200000
    segment.bytes: 1000000

The consumer app is listening for incoming messages. Here’s the bean responsible for receiving and logging messages:

@SpringBootApplication
@EnableKafka
public class KafkaConsumer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaConsumer.class);

   public static void main(String[] args) {
      SpringApplication.run(KafkaConsumer.class, args);
   }

   @Value("${app.in.topic}")
   private String topic;

   @KafkaListener(id = "info", topics = "${app.in.topic}")
   public void onMessage(@Payload Info info,
      @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Long key,
      @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
      LOG.info("Received(key={}, partition={}): {}", key, partition, info);
   }
}

Here’s the Deployment manifest for the consumer app:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer
spec:
  selector:
    matchLabels:
      app: consumer
  template:
    metadata:
      labels:
        app: consumer
    spec:
      containers:
      - name: consumer
        image: piomin/consumer
        resources:
          requests:
            memory: 200Mi
            cpu: 100m
        ports:
        - containerPort: 8080
        env:
          - name: TOPIC
            value: test-1
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap

We can run both Spring Boot apps using Skaffold. Firstly, we need to go to the kafka directory in our repository. Then let’s run the following command:

$ skaffold run -n strimzi --tail

Finally, we can verify the logs printed by our apps. As you see, all the messages sent by the producer app are received by the consumer app.

kafka-on-kubernetes-logs

5. Kafka Metrics in Prometheus

Once we installed the Strimzi Helm chart with the dashboard.enabled=true and dashboard.namespace=monitoring, we have several Grafana dashboard manifests placed in the monitoring namespace. Each dashboard is represented as a ConfigMap. Let’s display a list of ConfigMaps installed by the Strimzi Helm chart:

$ kubectl get cm -n monitoring | grep strimzi
strimzi-cruise-control                                    1      2m
strimzi-kafka                                             1      2m
strimzi-kafka-bridge                                      1      2m
strimzi-kafka-connect                                     1      2m
strimzi-kafka-exporter                                    1      2m
strimzi-kafka-mirror-maker-2                              1      2m
strimzi-kafka-oauth                                       1      2m
strimzi-kraft                                             1      2m
strimzi-operators                                         1      2m
strimzi-zookeeper                                         1      2m

Since Grafana is also installed in the monitoring namespace, it automatically imports all the dashboards from ConfigMaps annotated with grafana_dashboard. Consequently, after logging into Grafana (admin / prom-operator), we can easily switch between all the Kafka-related dashboards.

The only problem is that Prometheus doesn’t scrape the metrics exposed by the Kafka pods. Since we have already configured metrics exporting on the Strimzi Kafka CRD, Kafka pods expose the /metric endpoint for Prometheus under the 9404 port. Let’s take a look at the Kafka broker pod details:

In order to force Prometheus to scrape metrics from Kafka pods, we need to create the PodMonitor object. We should place it in the monitoring (1) namespace and set the release=kube-prometheus-stack label (2). The PodMonitor object filters all the pods from the strimzi namespace (3) that contains the strimzi.io/kind label having one of the values: Kafka, KafkaConnect, KafkaMirrorMaker, KafkaMirrorMaker2 (4). Also, it has to query the /metrics endpoint under the port with the tcp-prometheus name (5).

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  name: kafka-resources-metrics
  namespace: monitoring
  labels:
    app: strimzi
    release: kube-prometheus-stack
spec:
  selector:
    matchExpressions:
      - key: "strimzi.io/kind"
        operator: In
        values: ["Kafka", "KafkaConnect", "KafkaMirrorMaker", "KafkaMirrorMaker2"]
  namespaceSelector:
    matchNames:
      - strimzi
  podMetricsEndpoints:
  - path: /metrics
    port: tcp-prometheus
    relabelings:
    - separator: ;
      regex: __meta_kubernetes_pod_label_(strimzi_io_.+)
      replacement: $1
      action: labelmap
    - sourceLabels: [__meta_kubernetes_namespace]
      separator: ;
      regex: (.*)
      targetLabel: namespace
      replacement: $1
      action: replace
    - sourceLabels: [__meta_kubernetes_pod_name]
      separator: ;
      regex: (.*)
      targetLabel: kubernetes_pod_name
      replacement: $1
      action: replace
    - sourceLabels: [__meta_kubernetes_pod_node_name]
      separator: ;
      regex: (.*)
      targetLabel: node_name
      replacement: $1
      action: replace
    - sourceLabels: [__meta_kubernetes_pod_host_ip]
      separator: ;
      regex: (.*)
      targetLabel: node_ip
      replacement: $1
      action: replace

Finally, we can display the Grafana dashboard with Kafka metrics visualization. Let’s choose the dashboard with the “Strimzi Kafka” name. Here’s the general view:

kafka-on-kubernetes-metrics

There are several other diagrams available. For example, we can take a look at the statistics related to the incoming and outgoing messages.

6. Rebalancing Kafka with Cruise Control

Let’s analyze the typical scenario around Kafka related to increasing the number of brokers in the cluster. Before we do it, we will generate more incoming traffic to the test-1 topic. In order to do it, we can use the Grafana k6 tool. The k6 tool provides several extensions for load testing – including the Kafka plugin. Here’s the Deployment manifest that runs k6 with the Kafka extension on Kubernetes.

kind: ConfigMap
apiVersion: v1
metadata:
  name: load-test-cm
  namespace: strimzi
data:
  load-test.js: |
    import {
      Writer,
      SchemaRegistry,
      SCHEMA_TYPE_JSON,
    } from "k6/x/kafka";
    const writer = new Writer({
      brokers: ["my-cluster-kafka-bootstrap.strimzi:9092"],
      topic: "test-1",
    });
    const schemaRegistry = new SchemaRegistry();
    export default function () {
      writer.produce({
        messages: [
          {
            value: schemaRegistry.serialize({
              data: {
                id: 1,
                source: "test",
                space: "strimzi",
                cluster: "c1",
                message: "HELLO"
              },
              schemaType: SCHEMA_TYPE_JSON,
            }),
          },
        ],
      });
    }
    
    export function teardown(data) {
      writer.close();
    }
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: k6-test
  namespace: strimzi
spec:
  selector:
    matchLabels:
      app.kubernetes.io/name: k6-test
  template:
    metadata:
      labels:
        app.kubernetes.io/name: k6-test
    spec:
      containers:
        - image: mostafamoradian/xk6-kafka:latest
          name: xk6-kafka
          command:
            - "k6"
            - "run"
            - "--vus"
            - "1"
            - "--duration"
            - "720s"
            - "/tests/load-test.js"
          env:
            - name: KAFKA_URL
              value: my-cluster-kafka-bootstrap
            - name: CLUSTER
              value: c1
            - name: TOPIC
              value: test-1
            - name: POD
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: NAMESPACE
              valueFrom:
                fieldRef:
                  fieldPath: metadata.namespace
          volumeMounts:
            - mountPath: /tests
              name: test
      volumes:
        - name: test
          configMap:
            name: load-test-cm

Let’s apply the manifest to the strimzi namespace with the following command:

$ kubectl apply -f k8s/k6.yaml

After that, we can take a look at the k6 Pod logs. As you see, it generates and sends a lot of messages to the test-1 topic on our Kafka cluster:

Now, let’s increase the number of Kafka brokers in our cluster. We can do it by changing the value of the replicas field in the KafkaNodePool object:

$ kubectl scale kafkanodepool dual-role --replicas=4 -n strimzi

After a while, Strimzi will start a new pod with another Kafka broker. Although we have a new member of the Kafka cluster, all the partitions are still distributed only across three previous brokers. The situation would be different for the new topic. However, the partitions related to the existing topics won’t be automatically migrated to the new broker instance. Let’s verify the current partition structure for the test-1 topic with kcat CLI (I’m exposing Kafka API locally with kubectl port-forward):

$ kcat -b localhost:9092 -L -t test-1
Metadata for test-1 (from broker -1: localhost:9092/bootstrap):
 4 brokers:
  broker 0 at my-cluster-dual-role-0.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 1 at my-cluster-dual-role-1.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 2 at my-cluster-dual-role-2.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 3 at my-cluster-dual-role-3.my-cluster-kafka-brokers.strimzi.svc:9092 (controller)
 1 topics:
  topic "test-1" with 12 partitions:
    partition 0, leader 0, replicas: 0,1,2, isrs: 1,0,2
    partition 1, leader 1, replicas: 1,2,0, isrs: 1,0,2
    partition 2, leader 2, replicas: 2,0,1, isrs: 1,0,2
    partition 3, leader 0, replicas: 0,1,2, isrs: 1,0,2
    partition 4, leader 1, replicas: 1,2,0, isrs: 1,0,2
    partition 5, leader 2, replicas: 2,0,1, isrs: 1,0,2
    partition 6, leader 0, replicas: 0,1,2, isrs: 1,0,2
    partition 7, leader 1, replicas: 1,2,0, isrs: 1,0,2
    partition 8, leader 2, replicas: 2,0,1, isrs: 1,0,2
    partition 9, leader 0, replicas: 0,2,1, isrs: 1,0,2
    partition 10, leader 2, replicas: 2,1,0, isrs: 1,0,2
    partition 11, leader 1, replicas: 1,0,2, isrs: 1,0,2

Here comes Cruise Control. Cruise Control makes managing and operating Kafka much easier. For example, it allows us to move partitions across brokers after scaling up the cluster. Let’s see how it works. We have already enabled Cruise Control in the Strimzi Kafka CRD. In order to begin a rebalancing procedure, we should create the KafkaRebalance object. This object is responsible for asking Cruise Control to generate an optimization proposal.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
  name: my-rebalance
  labels:
    strimzi.io/cluster: my-cluster
spec: {}

If the optimization proposal is ready you will see the ProposalReady value in the Status.Conditions.Type field. I won’t get into the details of Cruise Control. It suggested moving 58 partition replicas between separate brokers in the cluster.

Let’s accept the proposal by annotating the KafkaRebalance object with strimzi.io/rebalance=approve:

$ kubectl annotate kafkarebalance my-rebalance \   
    strimzi.io/rebalance=approve -n strimzi

Finally, we can run the kcat command on the test-1 topic once again. Now, as you see, partition replicas are spread across all the brokers.

$ kcat -b localhost:9092 -L -t test-1
Metadata for test-1 (from broker -1: localhost:9092/bootstrap):
 4 brokers:
  broker 0 at my-cluster-dual-role-0.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 1 at my-cluster-dual-role-1.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 2 at my-cluster-dual-role-2.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 3 at my-cluster-dual-role-3.my-cluster-kafka-brokers.strimzi.svc:9092 (controller)
 1 topics:
  topic "test-1" with 12 partitions:
    partition 0, leader 2, replicas: 2,1,3, isrs: 1,2,3
    partition 1, leader 1, replicas: 1,2,0, isrs: 1,0,2
    partition 2, leader 2, replicas: 0,2,1, isrs: 1,0,2
    partition 3, leader 0, replicas: 0,2,3, isrs: 0,2,3
    partition 4, leader 1, replicas: 3,2,1, isrs: 1,2,3
    partition 5, leader 2, replicas: 2,3,0, isrs: 0,2,3
    partition 6, leader 0, replicas: 0,1,2, isrs: 1,0,2
    partition 7, leader 1, replicas: 3,1,0, isrs: 1,0,3
    partition 8, leader 2, replicas: 2,0,1, isrs: 1,0,2
    partition 9, leader 0, replicas: 0,3,1, isrs: 1,0,3
    partition 10, leader 2, replicas: 2,3,0, isrs: 0,2,3
    partition 11, leader 1, replicas: 1,0,3, isrs: 1,0,3

Final Thoughts

Strimzi allows us not only to install and manage Kafka but also the whole ecosystem around it. In this article, I showed how to export metrics to Prometheus and use the Cruise Control tool to rebalance a cluster after scale-up. We also ran Kafka in KRaft mode and then connected two simple Java apps with the cluster through Kubernetes Service.

The post Apache Kafka on Kubernetes with Strimzi appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/11/06/apache-kafka-on-kubernetes-with-strimzi/feed/ 6 14613
Local Development with Redpanda, Quarkus and Testcontainers https://piotrminkowski.com/2022/04/20/local-development-with-redpanda-quarkus-and-testcontainers/ https://piotrminkowski.com/2022/04/20/local-development-with-redpanda-quarkus-and-testcontainers/#comments Wed, 20 Apr 2022 08:13:36 +0000 https://piotrminkowski.com/?p=11098 In this article, you will learn how to speed up your local development with Redpanda and Quarkus. The main goal is to show that you can replace Apache KafkaⓇ with Redpanda without any changes in the source code. Instead, you will get a fast way to run your existing Kafka applications without Zookeeper and JVM. […]

The post Local Development with Redpanda, Quarkus and Testcontainers appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to speed up your local development with Redpanda and Quarkus. The main goal is to show that you can replace Apache Kafka with Redpanda without any changes in the source code. Instead, you will get a fast way to run your existing Kafka applications without Zookeeper and JVM. You will also see how Quarkus uses Redpanda as a local instance for development. Finally, we are going to run all containers in the Testcontainers Cloud.

For the current exercise, we use the same examples as described in one of my previous articles about Quarkus and Kafka Streams. Just to remind you: we are building a simplified version of the stock market platform. The stock-service application receives and handles incoming orders. There are two types of orders: purchase (BUY) and sale (SELL). While the stock-service consumes Kafka streams, the order-service generates and sends events to the orders.buy and orders.sell topics. Here’s the diagram with our architecture. As you see, the stock-service also uses PostgreSQL as a database.

quarkus-redpanda-arch

Source Code

If you would like to try this exercise yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. Then switch to the dev branch. After that, you should just follow my instructions. Let’s begin.

Install Redpanda

This step is not required. However, it is worth installing Redpanda since it provides a useful CLI called Redpanda Keeper (rpk) to manage a cluster. To install Redpanda on macOS just run the following command:

$ brew install redpanda-data/tap/redpanda

Now, we can easily create and run a new cluster. For the development purpose, we only need a single-node Redpanda cluster. In order to run, you need to have Docker on your laptop.

$ rpk container start

Before proceeding to the next steps let’s just remove a current cluster. Quarkus will create everything for us automatically.

$ rpk container purge

Quarkus with Kafka and Postgres

Let’s begin with the stock-service. It consumes streams from Kafka topics and connects to the PostgreSQL database, as I mentioned before. So, the first step is to include the following dependencies:

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-kafka-streams</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-jdbc-postgresql</artifactId>
</dependency>

Now, we may proceed to the implementation. The topology for all the streams is provided inside the following method:

@Produces
public Topology buildTopology() {
   ...
}

There are some different streams defined there. But let’s just take a look at the fragment of topology responsible for creating transactions from incoming orders

final String ORDERS_BUY_TOPIC = "orders.buy";
final String ORDERS_SELL_TOPIC = "orders.sell";
final String TRANSACTIONS_TOPIC = "transactions";

// ... other streams

KStream<Long, Order> orders = builder.stream(
   ORDERS_SELL_TOPIC,
   Consumed.with(Serdes.Long(), orderSerde));

builder.stream(ORDERS_BUY_TOPIC, Consumed.with(Serdes.Long(), orderSerde))
   .merge(orders)
   .peek((k, v) -> {
      log.infof("New: %s", v);
      logic.add(v);
   });

builder.stream(ORDERS_BUY_TOPIC, Consumed.with(Serdes.Long(), orderSerde))
   .selectKey((k, v) -> v.getProductId())
   .join(orders.selectKey((k, v) -> v.getProductId()),
      this::execute,
      JoinWindows.of(Duration.ofSeconds(10)),
      StreamJoined.with(Serdes.Integer(), orderSerde, orderSerde))
   .filterNot((k, v) -> v == null)
   .map((k, v) -> new KeyValue<>(v.getId(), v))
   .peek((k, v) -> log.infof("Done -> %s", v))
   .to(TRANSACTIONS_TOPIC, Produced.with(Serdes.Long(), transactionSerde));

The whole implementation is more advanced. For the details, you may refer to the article I mentioned in the introduction. Now, let’s imagine we are still developing our stock market app. Firstly, we should run PostgreSQL and a local Kafka cluster. We use Redpanda, which is easy to run locally. After that, we would typically provide addresses of both the database and broker in the application.properties. But using a feature called Quarkus Dev Services, the only thing we need to configure now, are the names of topics used for consuming Kafka Streams and the application id. Both of these are required by Kafka Streams.

Now, the most important thing: you just need to start the Quarkus app. Nothing more. DO NOT run any external tools by yourself and DO NOT provide any addresses for them in the configuration settings. Just add the two lines you see below:

quarkus.kafka-streams.application-id = stock
quarkus.kafka-streams.topics = orders.buy,orders.sell

Run Quarkus in dev mode with Redpanda

Before you run the Quarkus app, make sure you have Docker running on your laptop. When you do, the only thing you need is to start both test apps. Let’s begin with the stock-service since it receives orders generated by the order-service. Go to the stock-service directory and run the following command:

$ cd stock-service
$ mvn quarkus:dev

If you see the following logs, it means that everything went well. Our application has been started in 13 seconds. During this time, Quarkus also started Kafka, PostgreSQL on Docker, and built Kafka Streams. Everything in 13 seconds with a single command and without any additional configuration. Nice, right? Let’s check out what happened in the background:

Firstly, let’s find the following line of logs beginning with the sentence “Dev Services for Kafka started”. It perfectly describes the feature of Quarkus called Dev Services. Our Kafka instance has been started as a Docker container and is available under a dynamically generated port. The application connects to it. All other Quarkus apps you would run now will share the same instance of a broker. You can disable that feature by setting the property quarkus.kafka.devservices.shared to false.

It may be a little surprising, but Quarkus Dev Services for Kafka uses Redpanda to run a broker. Of course, Redpanda is a Kafka-compatible solution. Since it starts in ~one second and does not require Zookeeper, it is a great choice for local development.

In order to run tools like brokers or databases on Docker, Quarkus uses Testcontainers. If you are interested in more details about Quarkus Dev Services for Kafka, read the following documentation. For now, let’s display a list of running containers using the docker ps command. There is a container with Redpanda, PostgreSQL, and Testcontainers.

quarkus-redpanda-containers

Manage Kafka Streams with Redpanda and Quarkus

Let’s verify how everything works on the application side. After running the application, we can take advantage of another useful Quarkus feature called Dev UI. Our UI console is available under the address http://localhost:8080/q/dev/. After accessing it, you can display a topology of Kafka Streams by clicking the button inside the Apache Kafka Streams tile.

Here you will see a summary of available streams. For me, it is 12 topics and 15 state stores. You may also see a visualization of Kafka Streams’ topology. The following picture shows the fragment of topology. You can download the full image by clicking the green download button, visible on the right side of the screen.

quarkus-redpanda-dev

Now, let’s use the Redpanda CLI to display a list of created topics. In my case, Redpanda is available under the port 55001 locally. All the topics are automatically created by Quarkus during application startup. We need to define the names of topics used in communication between both our test apps. Those topics are: orders.buy, orders.sell and transactions. They are configured and created by the order-service. The stock-service is creating all other topics visible below, which are responsible for handling streams.

$ rpk topic list --brokers localhost:55001
NAME                                                    PARTITIONS  REPLICAS
orders.buy                                              1           1
orders.sell                                             1           1
stock-KSTREAM-JOINOTHER-0000000016-store-changelog      1           1
stock-KSTREAM-JOINOTHER-0000000043-store-changelog      1           1
stock-KSTREAM-JOINOTHER-0000000065-store-changelog      1           1
stock-KSTREAM-JOINTHIS-0000000015-store-changelog       1           1
stock-KSTREAM-JOINTHIS-0000000042-store-changelog       1           1
stock-KSTREAM-JOINTHIS-0000000064-store-changelog       1           1
stock-KSTREAM-KEY-SELECT-0000000005-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000006-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000032-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000033-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000054-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000055-repartition         1           1
stock-transactions-all-summary-changelog                1           1
stock-transactions-all-summary-repartition              1           1
stock-transactions-per-product-summary-30s-changelog    1           1
stock-transactions-per-product-summary-30s-repartition  1           1
stock-transactions-per-product-summary-changelog        1           1
stock-transactions-per-product-summary-repartition      1           1
transactions                                            1           1

In order to do a full test, we also need to run order-service. It is generating orders continuously and sending them to the orders.buy or orders.sell topics. Let’s do that.

Send messages to Redpanda with Quarkus

Before we run order-service, let’s see some implementation details. On the producer side, we need to include a single dependency responsible for integration with a Kafka broker:

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

Our application generates and sends random orders to the orders.buy or orders.sell topics. There are two methods for that, each of them dedicated to a single topic. Let’s just see a method for generating BUY orders. We need to annotate it with @Outgoing and set the channel name (orders-buy). Our method generates a single order per 500 milliseconds.

@Outgoing("orders-buy")
public Multi<Record<Long, Order>> buyOrdersGenerator() {
   return Multi.createFrom().ticks().every(Duration.ofMillis(500))
      .map(order -> {
         Integer productId = random.nextInt(10) + 1;
         int price = prices.get(productId) + random.nextInt(200);
         Order o = new Order(
            incrementOrderId(),
            random.nextInt(1000) + 1,
            productId,
            100 * (random.nextInt(5) + 1),
            LocalDateTime.now(),
            OrderType.BUY,
            price);
         log.infof("Sent: %s", o);
      return Record.of(o.getId(), o);
   });
}

After that, we need to map the channel name into a target topic name. Another required operation is to set the serializer for the message key and value.

mp.messaging.outgoing.orders-buy.connector = smallrye-kafka
mp.messaging.outgoing.orders-buy.topic = orders.buy
mp.messaging.outgoing.orders-buy.key.serializer = org.apache.kafka.common.serialization.LongSerializer
mp.messaging.outgoing.orders-buy.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Finally, go to the order-service directory and run the application.

$ cd order-service
$ mvn quarkus:dev

Once you start order-service, it will create topics and start sending orders. It uses the same instance of Redpanda as stock-service. You can run the docker ps command once again to verify it.

Now, just do a simple change in stock-service to reload the application. It will also reload the Kafka Streams topology. After that, it is starting to receive orders from the topics created by the order-service. Finally, it will create transactions from incoming orders and store them in the transactions topic.

Use Testcontainers Cloud

In our development process, we need to have a locally installed Docker ecosystem. But, what if we don’t have it? That’s where Testcontainers Cloud comes in. Testcontainers Cloud is the developer-first SaaS platform for modern integration testing with real databases, message brokers, cloud services, or any other component of application infrastructure. To simplify, we will do the same thing as before but our instances of Redpanda and PostgreSQL will not run on the local Docker, but on the remote Testcointainers platform.

What do you need to do to enable Testcontainers Cloud? Firstly, download the agent from the following site. You also need to be a beta tester to obtain an authorization token. Finally, just run the agent and kill your local Docker daemon. You should see the Testcontainers icon in the running apps with information about the connection to the cloud.

quarkus-redpanda-testcontainers

Docker should not run locally.

The same as before, just run both applications with the quarkus:dev command. Your Redpanda broker is running on the Testcontainers Cloud but, thanks to the agent, you may access it over localhost.

Once again you can verify a list of topics using the following command for the new broker:

$ rpk topic list --brokers localhost:59779

Final Thoughts

In this article, I focused on showing you how new and exciting technologies like Quarkus, Redpanda, and Testcontainers can work together. Local development is one of the use cases, but you may as well use them to write integration tests.

The post Local Development with Redpanda, Quarkus and Testcontainers appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/04/20/local-development-with-redpanda-quarkus-and-testcontainers/feed/ 2 11098
Autoscaling on Kubernetes with KEDA and Kafka https://piotrminkowski.com/2022/01/18/autoscaling-on-kubernetes-with-keda-and-kafka/ https://piotrminkowski.com/2022/01/18/autoscaling-on-kubernetes-with-keda-and-kafka/#comments Tue, 18 Jan 2022 14:58:54 +0000 https://piotrminkowski.com/?p=10475 In this article, you will learn how to autoscale your application that consumes messages from the Kafka topic with KEDA. The full name that stands behind that shortcut is Kubernetes Event Driven Autoscaling. In order to explain the idea behind it, I will create two simple services. The first of them is sending events to […]

The post Autoscaling on Kubernetes with KEDA and Kafka appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to autoscale your application that consumes messages from the Kafka topic with KEDA. The full name that stands behind that shortcut is Kubernetes Event Driven Autoscaling. In order to explain the idea behind it, I will create two simple services. The first of them is sending events to the Kafka topic, while the second is receiving them. We will run both these applications on Kubernetes. To simplify the exercise, we may use Spring Cloud Stream, which offers a smart integration with Kafka.

Architecture

Before we start, let’s take a moment to understand our scenario for today. We have a single Kafka topic used by both our applications to exchange events. This topic consists of 10 partitions. There is also a single instance of the producer that sends events at regular intervals. We are going to scale down and scale up the number of pods for the consumer service. All the instances of the consumer service are assigned to the same Kafka consumer group. It means that only a single instance with the group may receive the particular event.

Each consumer instance has only a single receiving thread. Therefore, we can easily simulate an event processing time. We will sleep the main thread for 1 second. On the other hand, the producer will send events with a variable speed. Also, it will split the messages across all available partitions. Such behavior may result in consumer lag on partitions because Spring Cloud Stream commits offset only after handling a message. In our case, the value of lag depends on producer speed and the number of running consumer instances. To clarify let’s take a look at the diagram below.

keda-kafka-arch1

Our goal is very simple. We need to adjust the number of consumer instances to the traffic rate generated by the producer service. The value of offset lag can’t exceed the desired threshold. If we increase the traffic rate on the producer side KEDA should scale up the number of consumer instances. Consequently, if we decrease the producer traffic rate it should scale down the number of consumer instances. Here’s the diagram with our scenario.

keda-kafka-arch2

Use Kafka with Spring Cloud Stream

In order to use Spring Cloud Stream for Kafka, we just need to include a single dependency in Maven pom.xml:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

After that, we can use a standard Spring Cloud Stream model. However, in the background, it integrates with Kafka through a particular binder implementation. I will not explain the details, but if you are interested please read the following article. It explains the basics at the example of RabbitMQ.

Both our applications are very simple. The producer just generates and sends events (by default in JSON format). The only thing we need to do in the code is to declare the Supplier bean. In the background, there is a single thread that generates and sends CallmeEvent every second. Each time it only increases the id field inside the message:

@SpringBootApplication
public class ProducerApp {

   private static int id = 0;

   public static void main(String[] args) {
      SpringApplication.run(ProducerApp.class, args);
   }

   @Bean
   public Supplier<CallmeEvent> eventSupplier() {
      return () -> new CallmeEvent(++id, "Hello" + id, "PING");
   }

}

We can change a default fixed delay between the Supplier ticks with the following property. Let’s say we want to send an event every 100 ms:

spring.cloud.stream.poller.fixedDelay = 100

We should also provide basic configuration like the Kafka address, topic name (if different than the name of the Supplier function), number of partitions, and a partition key. Spring Cloud Stream automatically creates topics on application startup.

spring.cloud.stream.bindings.eventSupplier-out-0.destination = test-topic
spring.cloud.stream.bindings.eventSupplier-out-0.producer.partitionKeyExpression = payload.id
spring.cloud.stream.bindings.eventSupplier-out-0.producer.partitionCount = 10
spring.kafka.bootstrap-servers = one-node-cluster.redpanda:9092

Now, the consumer application. It is also not very complicated. As I mentioned before, we will sleep the main thread inside the receiving method in order to simulate processing time.

public class ConsumerApp {

   private static final Logger LOG = LoggerFactory.getLogger(ConsumerAApp.class);

   public static void main(String[] args) {
      SpringApplication.run(ConsumerApp.class, args);
   }

   @Bean
   public Consumer<Message<CallmeEvent>> eventConsumer() {
      return event -> {
         LOG.info("Received: {}", event.getPayload());
         try {
            Thread.sleep(1000);
         } catch (InterruptedException e) { }
      };
   }

}

Finally, the configuration on the consumer side. It is important to set the consumer group and enable partitioning.

spring.cloud.stream.bindings.eventConsumer-in-0.destination = test-topic
spring.cloud.stream.bindings.eventConsumer-in-0.group = a
spring.cloud.stream.bindings.eventConsumer-in-0.consumer.partitioned = true
spring.kafka.bootstrap-servers = one-node-cluster.redpanda:9092

Now, we should deploy both applications on Kubernetes. But before we do that, let’s install Kafka and KEDA on Kubernetes.

Install Kafka on Kubernetes

To perform this part you need to install helm. Instead of Kafka directly, we can install Redpanda. It is a Kafka API compatible platform. However, the Redpanda operator requires cert-manager to create certificates for TLS communication. So, let’s install it first. We use the latest version of the cert-manager. It requires adding CRDs:

$ kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.6.1/cert-manager.crds.yaml

Then you need to add a new Helm repository:

$ helm repo add jetstack https://charts.jetstack.io

And finally, install it cert-manager:

$ helm install cert-manager \
   --namespace cert-manager \
   --version v1.6.1 \
   jetstack/cert-manager

Now, we can proceed to the Redpanda installation. The same as before, let’s add the Helm repository:

$ helm repo add redpanda https://charts.vectorized.io/
$ helm repo update

We can obtain the latest version of Redpanda:

$ export VERSION=$(curl -s https://api.github.com/repos/vectorizedio/redpanda/releases/latest | jq -r .tag_name)

Then, let’s add the CRDs:

$ kubectl apply -f https://github.com/vectorizedio/redpanda/src/go/k8s/config/crd

After that, we can finally install the Redpanda operator:

$ helm install \
   redpanda-operator \
   redpanda/redpanda-operator \
   --namespace redpanda-system \
   --create-namespace \
   --version $VERSION

We will install a single-node cluster in the redpanda namespace. To do that we need to apply the following manifest:

apiVersion: redpanda.vectorized.io/v1alpha1
kind: Cluster
metadata:
  name: one-node-cluster
spec:
  image: "vectorized/redpanda"
  version: "latest"
  replicas: 1
  resources:
    requests:
      cpu: 1
      memory: 1.2Gi
    limits:
      cpu: 1
      memory: 1.2Gi
  configuration:
    rpcServer:
      port: 33145
    kafkaApi:
    - port: 9092
    pandaproxyApi:
    - port: 8082
    adminApi:
    - port: 9644
    developerMode: true

Once you did that, you can verify a list of pods in the redpanda namespace:

$ kubectl get pod -n redpanda                      
NAME                 READY   STATUS    RESTARTS   AGE
one-node-cluster-0   1/1     Running   0          4s

If you noticed, I have already set a Kafka bootstrap server address in the application.properties. For me, it is one-node-cluster.redpanda:9092. You can verify it using the following command:

$ kubectl get svc -n redpanda
NAME               TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)                      AGE
one-node-cluster   ClusterIP   None         <none>        9644/TCP,9092/TCP,8082/TCP   23h

Install KEDA and integrate it with Kafka

The same as before we will install KEDA on Kubernetes with Helm. Let’s add the following Helm repo:

$ helm repo add kedacore https://kedacore.github.io/charts

Don’t forget to update the repository. We will install the operator in the keda namespace. Let’s create the namespace first:

$ kubectl create namespace keda

Finally, we can install the operator:

$ helm install keda kedacore/keda --namespace keda

I will run both example applications in the default namespace, so I will create a KEDA object also in this namespace. The main object responsible for configuring autoscaling with KEDA is ScaledObject. Here’s the definition:

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: consumer-scaled
spec:
  scaleTargetRef:
    name: consumer-deployment # (1)
  cooldownPeriod: 30 # (2)
  maxReplicaCount:  10 # (3)
  advanced:
    horizontalPodAutoscalerConfig: # (4)
      behavior:
        scaleDown:
          stabilizationWindowSeconds: 30
          policies:
            - type: Percent
              value: 50
              periodSeconds: 30
  triggers: # (5)
    - type: kafka
      metadata:
        bootstrapServers: one-node-cluster.redpanda:9092
        consumerGroup: a
        topic: test-topic
        lagThreshold: '5'

Let’s analyze the configuration in the details:

(1) We are setting autoscaler for the consumer application, which is deployed under the consumer-deployment name (see the next section for the Deployment manifest)

(2) We decrease the default value of the cooldownPeriod parameter from 300 seconds to 30 in order to test the scale-to-zero mechanism

(3) The maximum number of running pods is 10 (the same as the number of partitions in the topic) instead of the default 100

(4) We can customize the behavior of the Kubernetes HPA. Let’s do that for the scale-down operation. We could as well configure that for the scale-up operation. We allow to scale down 50% of current running replicas.

(5) The last and the most important part – a trigger configuration. We should set the address of the Kafka cluster, the name of the topic, and the consumer group used by our application. The lag threshold is 10. It sets the average target value of offset lags to trigger scaling operations.

Before applying the manifest containing ScaledObject we need to deploy the consumer application. Let’s proceed to the next section.

Test Autoscaling with KEDA and Kafka

Let’s deploy the consumer application first. It is prepared to be deployed with Skaffold, so you can just run the command skaffold dev from the consumer directory. Anyway, here’s the Deployment manifest:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer-deployment
spec:
  selector:
    matchLabels:
      app: consumer
  template:
    metadata:
      labels:
        app: consumer
    spec:
      containers:
      - name: consumer
        image: piomin/consumer
        ports:
        - containerPort: 8080

Once we created it, we can also apply the KEDA ScaledObject. After creating let’s display its status with the kubectl get so command.

keda-kafka-scaledobject

Ok, but… it is inactive. If you think about that’s logical since there are no incoming events in Kafka’s topic. Right? So, KEDA has performed a scale-to-zero operation as shown below:

Now, let’s deploy the producer application. For now, DO NOT override the default value of the spring.cloud.stream.poller.maxMessagesPerPoll parameter. The producer will send one event per second.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
        - name: producer
          image: piomin/producer
          ports:
            - containerPort: 8080

After some time you can run the kubectl get so once again. Now the status in the ACTIVE column should be true. And there is a single instance of the consumer application (1 event per second sent by a producer and received by a consumer).

We can also verify offsets and lags for consumer groups on the topic partitions. Just run the command rpk group describe a inside the Redpanda container.

Now, we will change the traffic rate on the producer side. It will send 5 events per second instead of 1 event before. To do that we have to define the following property in the application.properties file.

spring.cloud.stream.poller.fixedDelay = 200

Before KEDA performs autoscaling we still have a single instance of the consumer application. Therefore, after some time the lag on partitions will exceed the desired threshold as shown below.

Once autoscaling occurs we can display a list of deployments. As you see, now there are 5 running pods of the consumer service.

keda-kafka-scaling

Once again let’s verify the status of our Kafka consumer group. There 5 consumers on the topic partitions.

Finally, just to check it out – let’s undeploy the producer application. What happened? The consumer-deployment has been scaled down to zero.

Final Thoughts

You can use KEDA not only with Kafka. There are a lot of other options available including databases, different message brokers, or even cron. Here’s a full list of the supported tools. In this article, I showed how to use Kafka consumer offset and lag as a criterium for autoscaling with KEDA. I tried to explain this process in the detail. Hope it helps you to understand how KEDA exactly works 🙂

The post Autoscaling on Kubernetes with KEDA and Kafka appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/01/18/autoscaling-on-kubernetes-with-keda-and-kafka/feed/ 20 10475
Kafka Streams with Quarkus https://piotrminkowski.com/2021/11/24/kafka-streams-with-quarkus/ https://piotrminkowski.com/2021/11/24/kafka-streams-with-quarkus/#comments Wed, 24 Nov 2021 08:24:53 +0000 https://piotrminkowski.com/?p=10234 In this article, you will learn how to use Kafka Streams with Quarkus. The same as in my previous article we will create a simple application that simulates the stock market. But this time, we are going to use Quarkus instead of Spring Cloud. If you would like to figure out what is a streaming […]

The post Kafka Streams with Quarkus appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams with Quarkus. The same as in my previous article we will create a simple application that simulates the stock market. But this time, we are going to use Quarkus instead of Spring Cloud. If you would like to figure out what is a streaming platform and how it differs from a traditional message broker this article is for you. Moreover, we will study useful improvements related to Apache Kafka provided by Quarkus.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Architecture

In our case, there are two incoming streams of events. Both of them represent incoming orders. These orders are generated by the order-service application. It sends buy orders to the orders.buy topic and sell orders to the orders.sell topic. Then, the stock-service application receives and handles incoming events. In the first step, it needs to change the key of each message from the orderId to the productId. That’s because it has to join orders from different topics related to the same product in order to execute transactions. Finally, the transaction price is an average of sale and buy prices.

quarkus-kafka-streams-arch

We are building a simplified version of the stock market platform. Each buy order contains a maximum price at which a customer is expecting to buy a product. On the other hand, each sale order contains a minimum price a customer is ready to sell his product. If the sell order price is not greater than a buy order price for a particular product we are performing a transaction.

Each order is valid for 10 seconds. After that time the stock-service application will not handle such an order since it is considered as expired. Each order contains a number of products for a transaction. For example, we may sell 100 for 10 or buy 200 for 11. Therefore, an order may be fully or partially realized. The stock-service application tries to join partially realized orders to other new or partially realized orders. You can see the visualization of that process in the picture below.

quarkus-kafka-streams-app

Run Apache Kafka locally

Before we jump to the implementation, we need to run a local instance of Apache Kafka. If you don’t want to install it on your laptop, the best way to run it is with Redpanda. Redpanda is a Kafka API compatible streaming platform. In comparison to Kafka, it is relatively easy to run it locally. Normally, you would have to install Redpanda on your laptop and then create a cluster using their CLI. But with Quarkus you don’t need to do that! The only requirement is to have Docker installed. Thanks to the Quarkus Kafka extension and feature called Dev Services it automatically starts a Kafka broker in dev mode and when running tests. Moreover, the application is configured automatically.

The only thing you need to do in order to enable that feature is NOT to provide any Kafka address in configuration properties. Dev Services uses Testcontainers to run Kafka, so if you have Docker or any other environment supporting Testcontainers running you get a containerized instance of Kafka out-of-the-box. Another important thing. Firstly, start the order-service application. It automatically creates all the required topics in Kafka. Then run the stock-service application. It uses the Quarkus Kafka Streams extension and verifies if the required topics exist. Let’s visualize it.

quarkus-kafka-streams-run

Send events to Kafka with Quarkus

There are several ways to send events to Kafka with Quarkus. Because we need to send key/value pair we will use the io.smallrye.reactive.messaging.kafka.Record object for that. Quarkus is able to generate and send data continuously. In the fragment of code visible below, we send a single Order event per 500 ms. Each Order contains a random productId, price and productCount.

@Outgoing("orders-buy")
public Multi<Record<Long, Order>> buyOrdersGenerator() {
   return Multi.createFrom().ticks().every(Duration.ofMillis(500))
      .map(order -> {
         Integer productId = random.nextInt(10) + 1;
         int price = prices.get(productId) + random.nextInt(200);
         Order o = new Order(
             incrementOrderId(),
             random.nextInt(1000) + 1,
             productId,
             100 * (random.nextInt(5) + 1),
             LocalDateTime.now(),
             OrderType.BUY,
             price);
         log.infof("Sent: %s", o);
         return Record.of(o.getId(), o);
   });
}

@Outgoing("orders-sell")
public Multi<Record<Long, Order>> sellOrdersGenerator() {
   return Multi.createFrom().ticks().every(Duration.ofMillis(500))
      .map(order -> {
         Integer productId = random.nextInt(10) + 1;
         int price = prices.get(productId) + random.nextInt(200);
         Order o = new Order(
             incrementOrderId(),
             random.nextInt(1000) + 1,
             productId,
             100 * (random.nextInt(5) + 1),
             LocalDateTime.now(),
             OrderType.SELL,
             price);
         log.infof("Sent: %s", o);
         return Record.of(o.getId(), o);
   });
}

We will also define a single @Incoming channel in order to receive transactions produced by the stock-service. Thanks to that Quarkus will automatically create the topic transactions used by Quarkus Kafka Streams in stock-service. To be honest, I was not able to force the Quarkus Kafka Streams extension to create the topic automatically. It seems we need to use the SmallRye Reactive Messaging extension for that.

@Incoming("transactions")
public void transactions(Transaction transaction) {
   log.infof("New: %s", transaction);
}

Of course, we need to include the SmallRye Reactive Messaging dependency to the Maven pom.xml.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

Finally, let’s provide configuration settings. We have two outgoing topics and a single incoming topic. We can set their names. Otherwise, Quarkus uses the same name as the name of the channel. The names of our topics are orders.buy, order.sell and transactions.

mp.messaging.outgoing.orders-buy.connector = smallrye-kafka
mp.messaging.outgoing.orders-buy.topic = orders.buy
mp.messaging.outgoing.orders-buy.key.serializer = org.apache.kafka.common.serialization.LongSerializer
mp.messaging.outgoing.orders-buy.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

mp.messaging.outgoing.orders-sell.connector = smallrye-kafka
mp.messaging.outgoing.orders-sell.topic = orders.sell
mp.messaging.outgoing.orders-sell.key.serializer = org.apache.kafka.common.serialization.LongSerializer
mp.messaging.outgoing.orders-sell.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

mp.messaging.incoming.transactions.connector = smallrye-kafka
mp.messaging.incoming.transactions.topic = transactions
mp.messaging.incoming.transactions.value.deserializer = pl.piomin.samples.streams.order.model.deserializer.TransactionDeserializer

That’s all. Our orders generator is ready. If you the order-service application Quarkus will also run Kafka (Redpanda) instance. But first, let’s switch to the second sample application – stock-service.

Consume Kafka Streams with Quarkus

In the previous section, we were sending messages to the Kafka broker. Therefore, we used a standard Quarkus library for integration with Kafka based on the SmallRye Reactive Messaging framework. The stock-service application consumes messages as streams, so now we will use a module for Kafka Streams integration.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-kafka-streams</artifactId>
</dependency>

Our application also uses a database, an ORM layer and includes some other useful modules.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-jdbc-h2</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-health</artifactId>
</dependency>

In the first step, we are going to merge both streams of orders (buy and sell), insert the Order into the database, and print the event message. You could ask – why I use the database and ORM layer here since I have Kafka KTable? Well, I need transactions with lock support in order to coordinate the status of order realization (refer to the description in the introduction – fully and partially realized orders). I will give you more details about it in the next sections.

In order to process streams with Quarkus, we need to declare the org.apache.kafka.streams.Topology bean. It contains all the KStream and KTable definitions. Let’s start just with the part responsible for creating and emitting transactions from incoming orders. There are two KStream definitions created. The first of them is responsible for merging two order streams into a single one and then inserting a new Order into a database. The second of them creates and executes transactions by joining two streams using the productId key. But more about it in the next section.

@Produces
public Topology buildTopology() {
   ObjectMapperSerde<Order> orderSerde = 
      new ObjectMapperSerde<>(Order.class);
   ObjectMapperSerde<Transaction> transactionSerde = 
      new ObjectMapperSerde<>(Transaction.class);

   StreamsBuilder builder = new StreamsBuilder();

   KStream<Long, Order> orders = builder.stream(
      ORDERS_SELL_TOPIC,
      Consumed.with(Serdes.Long(), orderSerde));

   builder.stream(ORDERS_BUY_TOPIC, 
         Consumed.with(Serdes.Long(), orderSerde))
      .merge(orders)
      .peek((k, v) -> {
         log.infof("New: %s", v);
         logic.add(v);
      });

   builder.stream(ORDERS_BUY_TOPIC, 
         Consumed.with(Serdes.Long(), orderSerde))
      .selectKey((k, v) -> v.getProductId())
      .join(orders.selectKey((k, v) -> v.getProductId()),
         this::execute,
         JoinWindows.of(Duration.ofSeconds(10)),
         StreamJoined.with(Serdes.Integer(), orderSerde, orderSerde))
      .filterNot((k, v) -> v == null)
      .map((k, v) -> new KeyValue<>(v.getId(), v))
      .peek((k, v) -> log.infof("Done -> %s", v))
      .to(TRANSACTIONS_TOPIC, Produced.with(Serdes.Long(), transactionSerde));

}

To process the streams we need to add configuration properties. A list of input topics is required. We can also override a default application id and enable Kafka health check.

quarkus.kafka-streams.application-id = stock
quarkus.kafka-streams.topics = orders.buy,orders.sell
quarkus.kafka.health.enabled = true

Operations on Kafka Streams

Now, we may use some more advanced operations on Kafka Streams than just merging two different streams. In fact, that’s a key logic in our application. We need to join two different order streams into a single one using the productId as a joining key. Since the producer sets orderId as a message key, we first need to invoke the selectKey method for both order.sell and orders.buy streams. In our case, joining buy and sell orders related to the same product is just a first step. Then we need to verify if the maximum price in the buy order is not greater than the minimum price in the sell order.

The next step is to verify if both these have not been realized previously, as they also may be paired with other orders in the stream. If all the conditions are met we may create a new transaction. Finally, we may change a stream key from productId to the transactionId and send it to the dedicated transactions topic.

Each time we successfully join two orders we are trying to create a transaction. The execute(...) method is called within the KStream join method. Firstly, we are comparing the prices of both orders. Then we verify the realization status of both orders by accessing the H2 database. If the orders are still not fully realized we may create a transaction and update orders records in the database.

private Transaction execute(Order orderBuy, Order orderSell) {
   if (orderBuy.getAmount() >= orderSell.getAmount()) {
      int count = Math.min(orderBuy.getProductCount(), 
                           orderSell.getProductCount());
      boolean allowed = logic
         .performUpdate(orderBuy.getId(), orderSell.getId(), count);
      if (!allowed)
         return null;
      else
         return new Transaction(
            ++transactionId,
            orderBuy.getId(),
            orderSell.getId(),
            count,
            (orderBuy.getAmount() + orderSell.getAmount()) / 2,
            LocalDateTime.now(),
            "NEW"
      );
   } else {
            return null;
   }
}

Let’s take a closer look at the performUpdate() method called inside the execute() method. It initiates a transaction and locks both Order entities. Then it verifies each order realization status and updates it with the current values if possible. Only if the performUpdate() method finishes successfully the stock-service application creates a new transaction.

@ApplicationScoped
public class OrderLogic {

    @Inject
    Logger log;
    @Inject
    OrderRepository repository;

    @Transactional
    public Order add(Order order) {
        repository.persist(order);
        return order;
    }

    @Transactional
    public boolean performUpdate(Long buyOrderId, Long sellOrderId, int amount) {
        Order buyOrder = repository.findById(buyOrderId, 
           LockModeType.PESSIMISTIC_WRITE);
        Order sellOrder = repository.findById(sellOrderId, 
           LockModeType.PESSIMISTIC_WRITE);
        if (buyOrder == null || sellOrder == null)
            return false;
        int buyAvailableCount = 
           buyOrder.getProductCount() - buyOrder.getRealizedCount();
        int sellAvailableCount = 
           sellOrder.getProductCount() - sellOrder.getRealizedCount();
        if (buyAvailableCount >= amount && sellAvailableCount >= amount) {
            buyOrder.setRealizedCount(buyOrder.getRealizedCount() + amount);
            sellOrder.setRealizedCount(sellOrder.getRealizedCount() + amount);
            repository.persist(buyOrder);
            repository.persist(sellOrder);
            return true;
        } else {
            return false;
        }
    }
}

Nice 🙂 That’s all that we need to do in the first part of our exercise. Now we can run both our sample applications.

Run and manage Kafka Streams application with Quarkus

As I mentioned before, we first need to start the order-service. It runs a new Kafka instance and creates all required topics. Immediately after startup, it is ready to send new orders. To run the Quarkus app locally just go to the order-service directory and execute the following command:

$ mvn quarkus:dev

Just to verify you can display a list running Docker containers with the docker ps command. Here’s my result:

As you see the instance of Redpanda is running and it is available on a random port 49724. Quarkus did it for us. However, if you have Redpanda installed on your laptop you check out the list of created topics with their CLI rpk:

$ rpk topic list --brokers=127.0.0.1:49724

Then let’s run the stock-service. Go to the stock-service directory and run mvn quarkus:dev once again. After startup, it just works. Both applications share the same instance thanks to the Quarkus Dev Services. Now let’s access the Quarkus Dev UI console available at http://localhost:8080/q/dev/. Find the tile with the “Apache Kafka Streams” title.

You can check a visualization of our Kafka Streams topology. I will divide the image into two parts for better visibility.

Use Kafka KTable with Quarkus

We have already finished the implementation of the logic responsible for creating transactions from incoming orders. In the next step, we are going to perform analytical operations on the transactions stream. Our main goal is to calculate total number of transactions, total number of products sold/bought, and total value of transactions (price * productsCount) per each product. Here’s the object class used in calculations.

@RegisterForReflection
public class TransactionTotal {
   private int count;
   private int amount;
   private int productCount;

   // GETTERS AND SETTERS
}

Because the Transaction object does not contain information about the product, we first need to join the order to access it. Then we produce a KTable by per productId grouping and aggregation. After that, we may invoke an aggregate method that allows us to perform some more complex calculations. In that particular case, we are calculating the number of all executed transactions, their volume of products, and total value. The result KTable can be materialized as the state store. Thanks to that we will be able to query it by the name defined by the TRANSACTIONS_PER_PRODUCT_SUMMARY variable.

KeyValueBytesStoreSupplier storePerProductSupplier = Stores.persistentKeyValueStore(
   TRANSACTIONS_PER_PRODUCT_SUMMARY);

builder.stream(TRANSACTIONS_TOPIC, Consumed.with(Serdes.Long(), transactionSerde))
   .selectKey((k, v) -> v.getSellOrderId())
   .join(orders.selectKey((k, v) -> v.getId()),
      (t, o) -> new TransactionWithProduct(t, o.getProductId()),
      JoinWindows.of(Duration.ofSeconds(10)),
      StreamJoined.with(Serdes.Long(), transactionSerde, orderSerde))
   .groupBy((k, v) -> v.getProductId(), Grouped.with(Serdes.Integer(), transactionWithProductSerde))
   .aggregate(
      TransactionTotal::new,
      (k, v, a) -> {
         a.setCount(a.getCount() + 1);
         a.setProductCount(a.getAmount() + v.getTransaction().getAmount());
         a.setAmount(a.getProductCount() +
            (v.getTransaction().getAmount() * v.getTransaction().getPrice()));
         return a;
      },
      Materialized.<Integer, TransactionTotal> as(storePerProductSupplier)
         .withKeySerde(Serdes.Integer())
         .withValueSerde(transactionTotalSerde))
   .toStream()
   .peek((k, v) -> log.infof("Total per product(%d): %s", k, v))
   .to(TRANSACTIONS_PER_PRODUCT_AGGREGATED_TOPIC, 
      Produced.with(Serdes.Integer(), transactionTotalSerde));

Here’s the class responsible for interactive queries implementation. It injects KafkaStreams bean. Then it tries to obtain persistent store basing on the StockService.TRANSACTIONS_PER_PRODUCT_SUMMARY variable. As a result there is a ReadOnlyKeyValueStore with Integer as a key, and TransactionTotal as a value. We may return a single value related with the particular productId (getTransactionsPerProductData) or just return a list with results for all available products (getAllTransactionsPerProductData).

@ApplicationScoped
public class InteractiveQueries {

   @Inject
   KafkaStreams streams;

   public TransactionTotal getTransactionsPerProductData(Integer productId) {
      return getTransactionsPerProductStore().get(productId);
   }

   public Map<Integer, TransactionTotal> getAllTransactionsPerProductData() {
      Map<Integer, TransactionTotal> m = new HashMap<>();
      KeyValueIterator<Integer, TransactionTotal> it = getTransactionsPerProductStore().all();
      while (it.hasNext()) {
         KeyValue<Integer, TransactionTotal> kv = it.next();
         m.put(kv.key, kv.value);
      }
      return m;
   }

   private ReadOnlyKeyValueStore<Integer, TransactionTotal> getTransactionsPerProductStore() {
      return streams.store(
         StoreQueryParameters
            .fromNameAndType(StockService.TRANSACTIONS_PER_PRODUCT_SUMMARY, QueryableStoreTypes.keyValueStore()));
   }

}

Finally, we can create a REST controller responsible for exposing data retrieved by the interactive queries.

@ApplicationScoped
@Path("/transactions")
public class TransactionResource {

    @Inject
    InteractiveQueries interactiveQueries;

    @GET
    @Path("/products/{id}")
    public TransactionTotal getByProductId(@PathParam("id") Integer productId) {
        return interactiveQueries.getTransactionsPerProductData(productId);
    }

    @GET
    @Path("/products")
    public Map<Integer, TransactionTotal> getAllPerProductId() {
        return interactiveQueries.getAllTransactionsPerProductData();
    }

}

Now you can easily check out statistics related to the transactions created by the stock-service. You just need to call the following REST endpoints e.g.:

$ curl http://localhost:8080/transactions/products
$ curl http://localhost:8080/transactions/products/3
$ curl http://localhost:8080/transactions/products/5

Final Thoughts

Quarkus simplifies working with Kafka Streams and interactive queries. It provides useful improvements for developers like auto-start of Kafka in dev and test modes or Kafka streams visualization in dev UI console. You can easily compare the Quarkus approach with the Spring Cloud Stream Kafka support since I implemented the same logic for both those frameworks. Here’s the GitHub repository with Spring Cloud Stream Kafka Streams example.

The post Kafka Streams with Quarkus appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/11/24/kafka-streams-with-quarkus/feed/ 4 10234
Kafka Streams with Spring Cloud Stream https://piotrminkowski.com/2021/11/11/kafka-streams-with-spring-cloud-stream/ https://piotrminkowski.com/2021/11/11/kafka-streams-with-spring-cloud-stream/#comments Thu, 11 Nov 2021 10:07:45 +0000 https://piotrminkowski.com/?p=10193 In this article, you will learn how to use Kafka Streams with Spring Cloud Stream. We will build a simple Spring Boot application that simulates the stock market. Based on that example, I’ll try to explain what a streaming platform is and how it differs from a traditional message broker. If you are looking for […]

The post Kafka Streams with Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams with Spring Cloud Stream. We will build a simple Spring Boot application that simulates the stock market. Based on that example, I’ll try to explain what a streaming platform is and how it differs from a traditional message broker. If you are looking for an intro to the Spring Cloud Stream project you should read my article about it. It describes how to use Spring Cloud Stream with RabbitMQ in order to build event-driven microservices.

In Spring Cloud Stream there are two binders supporting the Kafka platform. We will focus on the second of them – Apache Kafka Streams Binder. You can read more about it in Spring Cloud documentation available here.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Introduction

There are three major types in Kafka Streams – KStreamKTable and GlobalKTable. Spring Cloud Stream supports all of them. We can easily convert the stream to the table and vice-versa. To clarify, all Kafka topics are stored as a stream. The difference is: when we want to consume that topic, we can either consume it as a table or a stream. KTable takes a stream of records from a topic and reduces it down to unique entries using a key of each message.

Architecture

KStream represents an immutable stream of data where each new record is treated as INSERT. In our case, there are two incoming streams of events. Both of them represent incoming orders. These orders are generated by the order-service application. It sends buy orders to the orders.buy topic and sell orders to the orders.sell topic. The stock-service application receives and handles events from those topics. In the first step, it needs to change the key of each message from the orderId to the productId. That’s because it has to join orders from different topics related to the same product in order to execute transactions. The final transaction price is an average of sell and buy order price.

kafka-streams-spring-cloud-concept

We are building a very simplified version of the stock market platform. Each buy order contains a maximum price at which a customer is expecting to buy a product. On the other hand, each sell order contains a minimum price a customer is ready to sell his product. If the sell order price is not greater than a buy order price for a particular product we may perform a transaction.

Each order is valid for 10 seconds. After that time the stock-service application will not handle such an order since it is considered as expired. Each order an amount of product for a transaction. For example, we may sell 100 for 10 or buy 200 for 11. Therefore, an order may be fully or partially realized. The stock-service application tries to join partially realized orders to other new or partially realized orders. You can see the visualization of that process in the picture below.

kafka-streams-spring-cloud-arch

Run Apache Kafka locally

Before we jump to the implementation, we need to run a local instance of Apache Kafka. If you don’t want to install it on your laptop, the best way to run it is through Redpanda. Redpanda is a Kafka API compatible streaming platform. In comparison to Kafka, it is relatively easy to run it locally. You just need to have Docker installed. Once you installed Redpanda on your machine you need to create a cluster. Since you don’t need a large cluster during development, you can create a single-node instance using the following command:

$ rpk container start

After running, it will print the address of your node. For me, it is 127.0.0.1:50842. So, now I can display a list of created topics using the following command:

$ rpk topic list --brokers 127.0.0.1:50842

Currently, there are no topics created. We don’t need to do anything manually. Spring Cloud Stream automatically creates missing topics on the application startup. In case, you would like to remove the Redpanda instance after our exercise, you just need to run the following command:

$ rpk container purge

Perfectly! Our local instance of Kafka is running. After that, we may proceed to the development.

Send events to Kafka with Spring Cloud Stream

In order to generate and send events continuously with Spring Cloud Stream Kafka, we need to define a Supplier bean. In our case, the order-service application generates test data. Each message contains a key and a payload that is serialized to JSON. The message key is the order’s id. We have two Supplier beans since we are sending messages to the two topics. Here’s the Order event class:

@Getter
@Setter
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class Order {
   private Long id;
   private Integer customerId;
   private Integer productId;
   private int productCount;
   @JsonDeserialize(using = LocalDateTimeDeserializer.class)
   @JsonSerialize(using = LocalDateTimeSerializer.class)
   private LocalDateTime creationDate;
   private OrderType type;
   private int amount;
}

Our application uses Lombok and Jackson for messages serialization. Of course, we also need to include Spring Cloud Stream Kafka Binder. Opposite to the consumer side, the producer does not use Kafka Streams, because it is just generating and sending events.

<dependencies>
  <dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
  </dependency>
</dependencies>

We have a predefined list of orders just to test our solution. We use MessageBuilder to build a message that contains the header kafka_messageKey and the Order payload.

@SpringBootApplication
@Slf4j
public class OrderService {

   private static long orderId = 0;
   private static final Random r = new Random();

   LinkedList<Order> buyOrders = new LinkedList<>(List.of(
      new Order(++orderId, 1, 1, 100, LocalDateTime.now(), OrderType.BUY, 1000),
      new Order(++orderId, 2, 1, 200, LocalDateTime.now(), OrderType.BUY, 1050),
      new Order(++orderId, 3, 1, 100, LocalDateTime.now(), OrderType.BUY, 1030),
      new Order(++orderId, 4, 1, 200, LocalDateTime.now(), OrderType.BUY, 1050),
      new Order(++orderId, 5, 1, 200, LocalDateTime.now(), OrderType.BUY, 1000),
      new Order(++orderId, 11, 1, 100, LocalDateTime.now(), OrderType.BUY, 1050)
   ));

   LinkedList<Order> sellOrders = new LinkedList<>(List.of(
      new Order(++orderId, 6, 1, 200, LocalDateTime.now(), OrderType.SELL, 950),
      new Order(++orderId, 7, 1, 100, LocalDateTime.now(), OrderType.SELL, 1000),
      new Order(++orderId, 8, 1, 100, LocalDateTime.now(), OrderType.SELL, 1050),
      new Order(++orderId, 9, 1, 300, LocalDateTime.now(), OrderType.SELL, 1000),
      new Order(++orderId, 10, 1, 200, LocalDateTime.now(), OrderType.SELL, 1020)
   ));

   public static void main(String[] args) {
      SpringApplication.run(OrderService.class, args);
   }

   @Bean
   public Supplier<Message<Order>> orderBuySupplier() {
      return () -> {
         if (buyOrders.peek() != null) {
            Message<Order> o = MessageBuilder
                  .withPayload(buyOrders.peek())
                  .setHeader(KafkaHeaders.MESSAGE_KEY, Objects.requireNonNull(buyOrders.poll()).getId())
                  .build();
            log.info("Order: {}", o.getPayload());
            return o;
         } else {
            return null;
         }
      };
   }

   @Bean
   public Supplier<Message<Order>> orderSellSupplier() {
      return () -> {
         if (sellOrders.peek() != null) {
            Message<Order> o = MessageBuilder
                  .withPayload(sellOrders.peek())
                  .setHeader(KafkaHeaders.MESSAGE_KEY, Objects.requireNonNull(sellOrders.poll()).getId())
                  .build();
            log.info("Order: {}", o.getPayload());
            return o;
         } else {
            return null;
         }
      };
   }

}

After that, we need to provide some configuration settings inside the application.yml file. Since we use multiple binding beans (in our case Supplier beans) we have to define the property spring.cloud.stream.function.definition that contains a list of bindable functions. We need to pass the Supplier method names divided by a semicolon. In the next few lines, we are setting the name of the target topics on Kafka and the message key serializer. Of course, we also need to set the address of the Kafka broker.

spring.kafka.bootstrap-servers: ${KAFKA_URL}

spring.cloud.stream.function.definition: orderBuySupplier;orderSellSupplier

spring.cloud.stream.bindings.orderBuySupplier-out-0.destination: orders.buy
spring.cloud.stream.kafka.bindings.orderBuySupplier-out-0.producer.configuration.key.serializer: org.apache.kafka.common.serialization.LongSerializer

spring.cloud.stream.bindings.orderSellSupplier-out-0.destination: orders.sell
spring.cloud.stream.kafka.bindings.orderSellSupplier-out-0.producer.configuration.key.serializer: org.apache.kafka.common.serialization.LongSerializer

Before running the application I need to create an environment variable containing the address of the Kafka broker.

$ export KAFKA_URL=127.0.0.1:50842

Then, let’s run our Spring Cloud application using the following Maven command:

$ mvn clean spring-boot:run

Once you did that, it sent some test orders for the same product (productId=1) as shown below.

We can also verify a list of topics on our local Kafka instance. Both of them have been automatically created by the Spring Cloud Stream Kafka binder before sending messages.

Consume Kafka Streams with Spring Cloud Stream

Now, we are going to switch to the stock-service implementation. In order to process streams of events, we need to include the Spring Cloud Stream Kafka Streams binder. Also, our application would have an ORM layer for storing data, so we have to include the Spring Data JPA starter and the H2 database.

<dependencies>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-jpa</artifactId>
  </dependency>
  <dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
  </dependency>
</dependencies>

In the first step, we are going to merge both streams of orders (buy and sell), insert the Order into the database, and print the event message. You could ask – why I use the database and ORM layer here since I have Kafka KTable? Well, I need transactions with lock support in order to coordinate the status of order realization (refer to the description in the introduction – fully and partially realized orders). I will give you more details about it in the next sections.

In order to process streams, we need to declare a functional bean that takes KStream as an input parameter. If there are two sources, we have to use BiConsumer (just for consumption) or BiFunction (to consume and send events to the new target stream) beans. In that case, we are not creating a new stream of events, so we can use BiConsumer.

@Autowired
OrderLogic logic;

@Bean
public BiConsumer<KStream<Long, Order>, KStream<Long, Order>> orders() {
   return (orderBuy, orderSell) -> orderBuy
         .merge(orderSell)
         .peek((k, v) -> {
            log.info("New({}): {}", k, v);
            logic.add(v);
         });
}

After that, we need to add some configuration settings. There are two input topics, so we need to map their names. Also, if we have more than one functional bean we need to set applicationId related to the particular function. For now, it is not required, since we have only a single function. But later, we are going to add other functions for some advanced operations.

spring.cloud.stream.bindings.orders-in-0.destination: orders.buy
spring.cloud.stream.bindings.orders-in-1.destination: orders.sell
spring.cloud.stream.kafka.streams.binder.functions.orders.applicationId: orders

For now, that’s all. You can now run the instance of stock-service using the Maven command mvn spring-boot:run.

Operations on Kafka Streams

Now, we may use some more advanced operations on Kafka Streams than just merging two different streams. In fact, that’s a key logic in our application. We need to join two different order streams into a single one using the productId as a joining key. Since the producer sets orderId as a message key, we first need to invoke the selectKey method for both order.sell and orders.buy streams. In our case, joining buy and sell orders related to the same product is just a first step. Then we need to verify if the maximum price in the buy order is not greater than the minimum price in the sell order.

The next step is to verify if both these have not been realized previously, as they also may be paired with other orders in the stream. If all the conditions are met we may create a new transaction. Finally, we may change a stream key from productId to the transactionId and send it to the dedicated transactions topic.

In order to implement the scenario described above, we need to define the BiFunction bean. It takes two input KStream from orders.buy and orders.sell and creates a new KStream of transaction events sent to the output transactions topic. While joining streams it uses 10 seconds sliding window and invokes the execute method for creating a new transaction.

@Bean
public BiFunction<KStream<Long, Order>, KStream<Long, Order>, KStream<Long, Transaction>> transactions() {
   return (orderBuy, orderSell) -> orderBuy
         .selectKey((k, v) -> v.getProductId())
         .join(orderSell.selectKey((k, v) -> v.getProductId()),
               this::execute,
               JoinWindows.of(Duration.ofSeconds(10)),
               StreamJoined.with(Serdes.Integer(), 
                                 new JsonSerde<>(Order.class), 
                                 new JsonSerde<>(Order.class)))
         .filterNot((k, v) -> v == null)
         .map((k, v) -> new KeyValue<>(v.getId(), v))
         .peek((k, v) -> log.info("Done -> {}", v));
}

private Transaction execute(Order orderBuy, Order orderSell) {
   if (orderBuy.getAmount() >= orderSell.getAmount()) {
      int count = Math.min(orderBuy.getProductCount(), orderSell.getProductCount());
      boolean allowed = logic.performUpdate(orderBuy.getId(), orderSell.getId(), count);
      if (!allowed)
         return null;
      else
         return new Transaction(
            ++transactionId,
            orderBuy.getId(),
            orderSell.getId(),
            Math.min(orderBuy.getProductCount(), orderSell.getProductCount()),
            (orderBuy.getAmount() + orderSell.getAmount()) / 2,
            LocalDateTime.now(),
            "NEW");
   } else {
      return null;
   }
}

Let’s take a closer look at the performUpdate() method called inside the execute() method. It initiates a transaction and locks both Order entities. Then it verifies each order realization status and updates it with the current values if possible. Only if the performUpdate() method finishes successfully the stock-service application creates a new transaction.

@Service
public class OrderLogic {

   private OrderRepository repository;

   public OrderLogic(OrderRepository repository) {
      this.repository = repository;
   }

   public Order add(Order order) {
      return repository.save(order);
   }

   @Transactional
   public boolean performUpdate(Long buyOrderId, Long sellOrderId, int amount) {
      Order buyOrder = repository.findById(buyOrderId).orElseThrow();
      Order sellOrder = repository.findById(sellOrderId).orElseThrow();
      int buyAvailableCount = buyOrder.getProductCount() - buyOrder.getRealizedCount();
      int sellAvailableCount = sellOrder.getProductCount() - sellOrder.getRealizedCount();
      if (buyAvailableCount >= amount && sellAvailableCount >= amount) {
         buyOrder.setRealizedCount(buyOrder.getRealizedCount() + amount);
         sellOrder.setRealizedCount(sellOrder.getRealizedCount() + amount);
         repository.save(buyOrder);
         repository.save(sellOrder);
         return true;
      } else {
         return false;
      }
   }
}

Here’s our repository class with the findById method. It sets a pessimistic lock on the Order entity during the transaction.

public interface OrderRepository extends CrudRepository<Order, Long> {

  @Lock(LockModeType.PESSIMISTIC_WRITE)
  Optional<Order> findById(Long id);

}

We also need to provide configuration settings for the transaction BiFunction.

spring.cloud.stream.bindings.transactions-in-0.destination: orders.buy
spring.cloud.stream.bindings.transactions-in-1.destination: orders.sell
spring.cloud.stream.bindings.transactions-out-0.destination: transactions
spring.cloud.stream.kafka.streams.binder.functions.transactions.applicationId: transactions

spring.cloud.stream.function.definition: orders;transactions

Use Kafka KTable with Spring Cloud Stream

We have already finished the implementation of the logic responsible for creating transactions from incoming orders. Now, we would like to examine data generated by our stock-service application. The most important things are how many transactions were generated, what was the volume of transactions globally, and per product. Three key statistics related to our transactions are: the number of transactions, the number of products sell/buy during transactions, and the total amount of transactions (price * productsCount). Here’s the definition of our object used for counting aggregations.

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class TransactionTotal {
   private int count;
   private int productCount;
   private int amount;
}

In order to call an aggregation method, we first need to group orders stream by the selected key. In the method visible below we use the status field as a grouping key. After that, we may invoke an aggregate method that allows us to perform some more complex calculations. In that particular case, we are calculating the number of all executed transactions, their volume of products, and total amount. The result KTable can be materialized as the state store. Thanks to that we will be able to query it by the name all-transactions-store.

@Bean
public Consumer<KStream<Long, Transaction>> total() {
   KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
                "all-transactions-store");
   return transactions -> transactions
         .groupBy((k, v) -> v.getStatus(), 
                  Grouped.with(Serdes.String(), new JsonSerde<>(Transaction.class)))
         .aggregate(
                 TransactionTotal::new,
                 (k, v, a) -> {
                    a.setCount(a.getCount() + 1);
                    a.setProductCount(a.getProductCount() + v.getAmount());
                    a.setAmount(a.getAmount() + (v.getPrice() * v.getAmount()));
                    return a;
                 },
                 Materialized.<String, TransactionTotal> as(storeSupplier)
                    .withKeySerde(Serdes.String())
                    .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
         .toStream()
         .peek((k, v) -> log.info("Total: {}", v));
}

The next function performs a similar aggregate operation, but this time per each product. Because the Transaction object does not contain information about the product, we first need to join the order to access it. Then we produce a KTable by per productId grouping and aggregation. The same as before we are materializing aggregation as a state store.

@Bean
public BiConsumer<KStream<Long, Transaction>, KStream<Long, Order>> totalPerProduct() {
   KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
                "transactions-per-product-store");
   return (transactions, orders) -> transactions
         .selectKey((k, v) -> v.getSellOrderId())
         .join(orders.selectKey((k, v) -> v.getId()),
               (t, o) -> new TransactionTotalWithProduct(t, o.getProductId()),
               JoinWindows.of(Duration.ofSeconds(10)),
               StreamJoined.with(Serdes.Long(), 
                  new JsonSerde<>(Transaction.class), 
                  new JsonSerde<>(Order.class)))
         .groupBy((k, v) -> v.getProductId(), 
            Grouped.with(Serdes.Integer(), new JsonSerde<>(TransactionTotalWithProduct.class)))
         .aggregate(
               TransactionTotal::new,
               (k, v, a) -> {
                  a.setCount(a.getCount() + 1);
                  a.setProductCount(a.getProductCount() + v.getTransaction().getAmount());
                  a.setAmount(a.getAmount() + (v.getTransaction().getPrice() * v.getTransaction().getAmount()));
                  return a;
               },
               Materialized.<Integer, TransactionTotal> as(storeSupplier)
                  .withKeySerde(Serdes.Integer())
                  .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
         .toStream()
         .peek((k, v) -> log.info("Total per product({}): {}", k, v));
}

What if we would like to perform similar aggregations to described above, but only for a particular period of time? We need to invoke the windowedBy method and produce a dedicated state store for such operations.

@Bean
public BiConsumer<KStream<Long, Transaction>, KStream<Long, Order>> latestPerProduct() {
   WindowBytesStoreSupplier storeSupplier = Stores.persistentWindowStore(
      "latest-transactions-per-product-store", Duration.ofSeconds(30), Duration.ofSeconds(30), false);
   return (transactions, orders) -> transactions
      .selectKey((k, v) -> v.getSellOrderId())
      .join(orders.selectKey((k, v) -> v.getId()),
            (t, o) -> new TransactionTotalWithProduct(t, o.getProductId()),
            JoinWindows.of(Duration.ofSeconds(10)),
            StreamJoined.with(Serdes.Long(), new JsonSerde<>(Transaction.class), new JsonSerde<>(Order.class)))
      .groupBy((k, v) -> v.getProductId(), Grouped.with(Serdes.Integer(), new JsonSerde<>(TransactionTotalWithProduct.class)))
      .windowedBy(TimeWindows.of(Duration.ofSeconds(30)))
      .aggregate(
            TransactionTotal::new,
            (k, v, a) -> {
               a.setCount(a.getCount() + 1);
               a.setAmount(a.getAmount() + v.getTransaction().getAmount());
               return a;
            },
            Materialized.<Integer, TransactionTotal> as(storeSupplier)
               .withKeySerde(Serdes.Integer())
               .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
      .toStream()
      .peek((k, v) -> log.info("Total per product last 30s({}): {}", k, v));
}

Interactive queries

We have already created and configured all required Kafka Streams with Spring Cloud. Finally, we can execute queries on state stores. This operation is called an interactive query. Let’s create a REST controller for exposing such endpoints with the results. In order to query Kafka Streams state stores with Spring Cloud, we need to inject the InteractiveQueryService bean into the controller.

@RestController
@RequestMapping("/transactions")
public class TransactionController {

   private InteractiveQueryService queryService;

   public TransactionController(InteractiveQueryService queryService) {
      this.queryService = queryService;
   }

   @GetMapping("/all")
   public TransactionTotal getAllTransactionsSummary() {
      ReadOnlyKeyValueStore<String, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("all-transactions-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get("NEW");
   }

   @GetMapping("/product/{productId}")
   public TransactionTotal getSummaryByProductId(@PathVariable("productId") Integer productId) {
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get(productId);
   }

   @GetMapping("/product/latest/{productId}")
   public TransactionTotal getLatestSummaryByProductId(@PathVariable("productId") Integer productId) {
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("latest-transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get(productId);
   }

   @GetMapping("/product")
   public Map<Integer, TransactionTotal> getSummaryByAllProducts() {
      Map<Integer, TransactionTotal> m = new HashMap<>();
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      KeyValueIterator<Integer, TransactionTotal> it = keyValueStore.all();
      while (it.hasNext()) {
         KeyValue<Integer, TransactionTotal> kv = it.next();
         m.put(kv.key, kv.value);
      }
      return m;
   }

}

Before you run the latest version of the stock-service application you should generate more differentiated random data. Let’s say we would like to generate orders for 5 different products with floating prices as shown below. Just uncomment the following fragment of code in the order-service and run the application once again to generate an infinitive stream of events.

private static long orderId = 0;
private static final Random r = new Random();

private Map<Integer, Integer> prices = Map.of(
      1, 1000, 
      2, 2000, 
      3, 5000, 
      4, 1500, 
      5, 2500);

@Bean
public Supplier<Message<Order>> orderBuySupplier() {
   return () -> {
      Integer productId = r.nextInt(1, 6);
      int price = prices.get(productId) + r.nextInt(-100, 100);
      Order o = new Order(
         ++orderId,
         r.nextInt(1, 6),
         productId,
         100,
         LocalDateTime.now(),
         OrderType.BUY,
         price);
      log.info("Order: {}", o);
      return MessageBuilder
         .withPayload(o)
         .setHeader(KafkaHeaders.MESSAGE_KEY, orderId)
         .build();
   };
}

You may also want to generate more messages. To do that you need to decrease timeout for Spring Cloud Stream Kafka Supplier.

spring.cloud.stream.poller.fixedDelay: 100

After running both our sample applications you may verify the logs on the stock-service side.

Then you may call our REST endpoints performing interactive queries on the materialized Kafka KTable.

$ curl http://localhost:8080/transactions/all
$ curl http://localhost:8080/transactions/product/3
$ curl http://localhost:8080/transactions/product/latest/5

It looks simple? Well, under the hood it may look quite more complicated 🙂 Here’s a final list of topics automatically created to the needs of our application.

kafka-streams-spring-cloud-topics

Final Thoughts

Spring Cloud Stream simplifies working with Kafka Streams and interactive queries. Kafka Streams by itself is a very powerful mechanism. In this article, I showed you how we can use it to implement not very trivial logic and then analyze data in various ways.

The post Kafka Streams with Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/11/11/kafka-streams-with-spring-cloud-stream/feed/ 21 10193
Spring Cloud Stream with Schema Registry and Kafka https://piotrminkowski.com/2021/07/22/spring-cloud-stream-with-schema-registry-and-kafka/ https://piotrminkowski.com/2021/07/22/spring-cloud-stream-with-schema-registry-and-kafka/#respond Thu, 22 Jul 2021 07:17:30 +0000 https://piotrminkowski.com/?p=9981 In this article, you will learn how to use Confluent Schema Registry with Spring Cloud Stream and Kafka in a microservices architecture. We will use Apache Avro to serialize and deserialize events exchanged between our applications. Spring Cloud Stream provides a handy mechanism for integration with Kafka and schema registry. Ok, but before we start, […]

The post Spring Cloud Stream with Schema Registry and Kafka appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Confluent Schema Registry with Spring Cloud Stream and Kafka in a microservices architecture. We will use Apache Avro to serialize and deserialize events exchanged between our applications. Spring Cloud Stream provides a handy mechanism for integration with Kafka and schema registry.

Ok, but before we start, let’s say some words about schema registry. What is this? And why we may use it in our event-driven architecture? Let’s imagine we change the message on the producer side, by adding or removing some fields. We sent that message to a Kafka topic, but we don’t have many subscribers is receiving such events. In a typical microservices architecture, we may have many producers and many subscribers. It is often necessary for all those microservices to agree on a contract that is based on a schema. If a schema is evolving, the existing microservices are still required to work. Here comes a schema registry server. It provides a RESTful interface for storing and retrieving schemas in different formats like JSON, Protobuf, or Avro. It also stores a versioned history of all schemas and provides schema compatibility checks.

We may choose between several available products. Spring Cloud has its own implementation of a schema registry server. Although it can be easily integrated with Spring Cloud Stream, we won’t use it. Currently, it doesn’t allow verifying compatibility between different versions. There is also an Apicurio registry. On the other hand, it is not possible to easily integrate it with Spring Cloud Stream. Therefore our choice fell on the Confluent schema registry.

Event-driven architecture with Spring Cloud and schema registry

We are going to run three applications. One of them is sending events to the Kafka topic, while two others are receiving them. The integration with Kafka is built on top of Spring Cloud Stream. The consumer Consumer-A is expecting events compatible with the v1 of schema, while the second subscriber is expecting events compatible with the v2 of schema. Before sending a message to Kafka the producer application tries to load schema definition from a remote server. If there is no result, it submits the data to the server, which replies with versioning information. The following diagram illustrates our architecture.

spring-cloud-schema-registry-arch

If a new schema is not compatible with the previous version, a schema registry rejects it. As a result, Spring Cloud Stream doesn’t allow to send a message to the Kafka topic. Otherwise, it serializes a message using Apache Avro. When a subscriber receives a message it first fetches schema from a remote registry. It gets a version of the schema from the header of a message. Finally, it deserializes it using the Avro format.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository and switch to the schema-registry branch. Then go to the event-driven directory. After that, you should just follow my instructions. Let’s begin.

Running Confluent Schema Registry on Kubernetes

It seems that the simplest way to run Confluent Schema Registry locally is on Kubernetes. Since we need to run at least Zookeeper and Kafka to be able to run schema registry we will use Helm for it. First, let’s add Confluent Helm repository.

$ helm repo add confluentinc https://packages.confluent.io/helm
$ helm repo update

Then we just need to install Confluent Platform using operator.

$ kubectl create ns confluent
$ helm upgrade --install confluent-operator confluentinc/confluent-for-kubernetes
$ kubectl apply -f https://raw.githubusercontent.com/confluentinc/confluent-kubernetes-examples/master/quickstart-deploy/confluent-platform.yaml

Finally, let’s display a list of running pods in the confluent namespace.

$ kubectl get pod -n confluent
NAME                                                  READY   STATUS    RESTARTS   AGE
kafka-confluent-cp-control-center-5ccb7479fd-hmpg6    1/1     Running   10         2d17h
kafka-confluent-cp-kafka-0                            2/2     Running   5          2d17h
kafka-confluent-cp-kafka-1                            2/2     Running   5          2d17h
kafka-confluent-cp-kafka-2                            2/2     Running   5          2d17h
kafka-confluent-cp-kafka-connect-797bd95655-kxnzm     2/2     Running   6          2d17h
kafka-confluent-cp-kafka-rest-69f49987bf-6nds7        2/2     Running   13         2d17h
kafka-confluent-cp-ksql-server-54675f9777-rbcb7       2/2     Running   9          2d17h
kafka-confluent-cp-schema-registry-7f6f6f9f8d-sh4b9   2/2     Running   11         2d17h
kafka-confluent-cp-zookeeper-0                        2/2     Running   4          2d17h
kafka-confluent-cp-zookeeper-1                        2/2     Running   4          2d17h
kafka-confluent-cp-zookeeper-2                        2/2     Running   4          2d17h

After that, we may display a list of Kubernetes services. Our application we will connect to the Kafka cluster through the kafka-confluent-cp-kafka service.

$ kubectl get svc -n confluent
NAME                                    TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)             AGE
kafka-confluent-cp-control-center       ClusterIP   10.100.47.14     <none>        9021/TCP            2d17h
kafka-confluent-cp-kafka                ClusterIP   10.102.129.194   <none>        9092/TCP,5556/TCP   2d17h
kafka-confluent-cp-kafka-connect        ClusterIP   10.103.223.169   <none>        8083/TCP,5556/TCP   2d17h
kafka-confluent-cp-kafka-headless       ClusterIP   None             <none>        9092/TCP            2d17h
kafka-confluent-cp-kafka-rest           ClusterIP   10.102.7.98      <none>        8082/TCP,5556/TCP   2d17h
kafka-confluent-cp-ksql-server          ClusterIP   10.108.116.196   <none>        8088/TCP,5556/TCP   2d17h
kafka-confluent-cp-schema-registry      ClusterIP   10.102.169.4     <none>        8081/TCP,5556/TCP   2d17h
kafka-confluent-cp-zookeeper            ClusterIP   10.99.33.73      <none>        2181/TCP,5556/TCP   2d17h
kafka-confluent-cp-zookeeper-headless   ClusterIP   None             <none>        2888/TCP,3888/TCP   2d17h

Integrate Spring Cloud Stream with Confluent Schema Registry

In order to enable integration with Confluent Schema Registry we first need to include the spring-cloud-schema-registry-client dependency to the Maven pom.xml.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-schema-registry-client</artifactId>
</dependency>

After that, we should enable RegistryClient through annotation. By default, the client uses a schema registry server provided by Spring Cloud. Therefore, we have registered the ConfluentSchemaRegistryClient bean as a default client implementation.

@SpringBootApplication
@EnableSchemaRegistryClient
class ProductionApplication {

   @Primary
   @Bean
   fun schemaRegistryClient(@Value("\${spring.cloud.schemaRegistryClient.endpoint}") endpoint: String?): SchemaRegistryClient {
      val client = ConfluentSchemaRegistryClient()
      client.setEndpoint(endpoint)
      return client
   }
}

Since we run our schema registry on Kubernetes, its address is different the default one. Let’s override it in application.properties.

spring.cloud.schemaRegistryClient.endpoint=http://kafka-confluent-cp-schema-registry:8081/

Because we are going to serialize messages using Apache Avro format, we need to change a default content type for all topics to application/*-avro. The message is sent with a contentType header by using the following scheme: application/[prefix].[subject].v[version]+avro, where prefix is configurable and subject is deduced from the payload type. The default prefix is vnd, and since the name of a message class is CallmeEvent the value of the header would be application/vnd.callmeevent.v1+avro for the v1 version of schema or application/vnd.callmeevent.v2+avro for the v2 version.

spring.cloud.stream.default.contentType=application/*+avro

Alternatively, we may set a content type just for a single destination. But more about it in the next sections.

Event class and Apache Avro serialization

We may choose between two types of approaches to the event class creation when working with Apache Avro. It is possible to generate Avro schema from a model class, or generate class from Avro schema using avro-maven-plugin. Assuming we use a second approach we first need to create Avro schema and place it in the source code as the .avsc file. Let’s say it is our Avro schema. It contains three fields id, message and eventType. The name of a generated class will be CallmeEvent and a package name will be the same as the namespace.

{
  "type":"record",
  "name":"CallmeEvent",
  "namespace":"pl.piomin.samples.eventdriven.producer.message.avro",
  "fields": [
    {
      "name":"id",
      "type":"int"
    },{
      "name":"message",
      "type":"string"
    },{
      "name":"eventType",
      "type": "string"
    }
  ]
}

After that, we need to the following plugin to the Maven pom.xml. We just need to configure the input directory with Avro schema files, and the output directory for the generated classes. Once you run a build, using for example mvn clean package command it will generate a required class.

<plugin>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-maven-plugin</artifactId>
  <version>1.10.2</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>schema</goal>
      </goals>
      <configuration>
        <sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
        <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
      </configuration>
    </execution>
  </executions>
</plugin>

Just to simplify working with generated classes, let’s include the target/generated-sources/avro as a source directory.

<plugin>
  <groupId>org.codehaus.mojo</groupId>
  <artifactId>build-helper-maven-plugin</artifactId>
  <version>3.2.0</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>add-source</goal>
      </goals>
      <configuration>
        <sources>
          <source>${project.build.directory}/generated-sources/avro</source>
        </sources>
      </configuration>
    </execution>
  </executions>
</plugin>

However, the simplest approach, especially in development, is to generate Avro schema automatically from the source code. With this approach, we first need to create CallmeEvent class.

class CallmeEvent(val id: Int,
                  val message: String,
                  val eventType: String)

Then, we just need to enable dynamic Avro schema generation. Once you do it, Spring Cloud Stream automatically generates and sends schema to the schema registry before sending a message to a Kafka topic.

spring.cloud.schema.avro.dynamicSchemaGenerationEnabled=true

Integrate Spring Cloud Stream with Kafka

Spring Cloud Stream offers a broker agnostic programming model for sending and receiving messages. If you are looking for a quick introduction to that model and event-driven microservices read my article Introduction to event-driven microservices with Spring Cloud Stream. We use the same scenario as described in this article. However, we will add schema registry support and replace RabbitMQ with Kafka. In order to change the broker, we just need to replace a binder implementation as shown below.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

Here’s the main class of the producer-service application. It uses the Supplier bean to generate events continuously after startup.

@SpringBootApplication
@EnableSchemaRegistryClient
class ProductionApplication {

   var id: Int = 0

   @Value("\${callme.supplier.enabled}")
   val supplierEnabled: Boolean = false

   @Bean
   fun callmeEventSupplier(): Supplier<Message<CallmeEvent>?> = Supplier { createEvent() }

   @Primary
   @Bean
   fun schemaRegistryClient(@Value("\${spring.cloud.schemaRegistryClient.endpoint}") endpoint: String?): SchemaRegistryClient {
      val client = ConfluentSchemaRegistryClient()
      client.setEndpoint(endpoint)
      return client
   }

   private fun createEvent(): Message<CallmeEvent>? {
      return if (supplierEnabled)
         MessageBuilder.withPayload(CallmeEvent(++id, "I'm callme event!", "ping"))
                     .setHeader("to_process", true)
                     .build()
      else
         null
   }
}

Here’s a Spring Cloud Stream configuration for producer-service and Supplier bean. It configures partitioning based on the value of the id field.

spring.cloud.stream.bindings.callmeEventSupplier-out-0.contentType=application/*+avro
spring.cloud.stream.bindings.callmeEventSupplier-out-0.destination=callme-events
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionCount=2

Both consumers are receiving messages from the callme-events topic. The same as for producer-service we need to enable RegistryClient support.

@SpringBootApplication
@EnableSchemaRegistryClient
class ConsumerAApplication {

   val logger: Logger = LoggerFactory.getLogger(ConsumerAApplication::class.java)

   @Bean
   fun callmeEventConsumer(): Consumer<CallmeEvent> = Consumer { 
      logger.info("Received: {}", it) 
   }
}

We also need to configure deserialization with Avro and partitioning on the consumer side.

spring.cloud.stream.default.contentType=application/*+avro
spring.cloud.stream.bindings.callmeEventSupplier-in-0.contentType=application/*+avro
spring.cloud.stream.bindings.callmeEventConsumer-in-0.destination=callme-events
spring.cloud.stream.bindings.callmeEventConsumer-in-0.group=a
spring.cloud.stream.bindings.callmeEventConsumer-in-0.consumer.partitioned=true
spring.cloud.stream.instanceCount=2
spring.cloud.stream.instanceIndex=${INSTANCE_INDEX}

Deploy applications on Kubernetes

Firstly, let’s deploy our Spring Cloud Stream applications on Kubernetes. Here’s a Deployment manifest for producer-service.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
      - name: producer
        image: piomin/producer-service
        ports:
        - containerPort: 8080

We also have similar manifests for consumer applications. We need to set the INSTANCE_INDEX environment variable, which is then responsible for partitioning configuration.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer-a
spec:
  selector:
    matchLabels:
      app: consumer-a
  template:
    metadata:
      labels:
        app: consumer-a
    spec:
      containers:
      - name: consumer-a
        image: piomin/consumer-a-service
        env:
          - name: INSTANCE_INDEX
            value: "0"
        ports:
        - containerPort: 8080

The Deployment manifest for the consumer-b application is visible below.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer-b
spec:
  selector:
    matchLabels:
      app: consumer-b
  template:
    metadata:
      labels:
        app: consumer-b
    spec:
      containers:
      - name: consumer-b
        image: piomin/consumer-b-service
        env:
          - name: INSTANCE_INDEX
            value: "1"
        ports:
        - containerPort: 8080

All those applications may be deployed on Kubernetes with Skaffold. Each application directory contains a Skaffold configuration file skaffold.yaml, so you just need to execute the following command to run them on Kubernetes.

$ skaffold run

Testing integration between Spring Cloud Stream and schema registry

In order to register the v1 version of the schema, we should run the producer-service application with the following event class.

class CallmeEvent(val id: Int,
                  val message: String)

Then, we should restart it with the new version of the CallmeEvent class as shown below.

class CallmeEvent(val id: Int,
                  val message: String,
                  val eventType: String)

Now, we can verify a list of schemas registered on the server. First, let’s enable port forwarding for the Confluent Schema Registry service.

$ kubectl port-forward svc/kafka-confluent-cp-schema-registry 8081:8081 -n confluent

Thanks to that, we may access schema registry REST API on the local port. Let’s display a list of registered subjects. As you see there is a single subject called callmeevent.

$ curl http://localhost:8081/subjects
["callmeevent"]

In the next step, we may get a list of versions registered under the callmeevent subject. As we expect, there are two versions available in the schema registry.

$ curl http://localhost:8081/subjects/callmeevent/versions
[1,2]

We can display a full schema definition by calling the following endpoint using schema id.

$ curl http://localhost:8081/schemas/ids/1
{"schema":"{\"type\":\"record\",\"name\":\"CallmeEvent\",\"namespace\":\"pl.piomin.samples.eventdriven.producer.message\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"message\",\"type\":\"string\"}]}"}

Finally, we are going to change our schema once again. Until then, a new version of a schema was compatible with the previous one. Now, we create a schema, which is incompatible with the previous version. In particular, we change the eventType field into eventTp. That change is provided on the producer side.

class CallmeEvent(val id: Int,
                  val message: String,
                  val eventTp: String)

After restarting producer-service Spring Cloud Stream tries to register a new version of the schema. Let’s just take a look at application logs. As you see, a new schema has been rejected by the Confluent Schema Registry. Here’s a fragment of producer-service logs after a schema change.

The post Spring Cloud Stream with Schema Registry and Kafka appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/07/22/spring-cloud-stream-with-schema-registry-and-kafka/feed/ 0 9981
Knative Eventing with Quarkus, Kafka and Camel https://piotrminkowski.com/2021/06/14/knative-eventing-with-quarkus-kafka-and-camel/ https://piotrminkowski.com/2021/06/14/knative-eventing-with-quarkus-kafka-and-camel/#comments Mon, 14 Jun 2021 07:47:54 +0000 https://piotrminkowski.com/?p=9797 In this article, you will learn how to use Quarkus with Camel to create applications that send messages to Kafka and receive CloudEvent from Knative Eventing. We will build a very similar system to the system described in my previous article Knative Eventing with Kafka and Quarkus. However, this time we will use Apache Camel […]

The post Knative Eventing with Quarkus, Kafka and Camel appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Quarkus with Camel to create applications that send messages to Kafka and receive CloudEvent from Knative Eventing. We will build a very similar system to the system described in my previous article Knative Eventing with Kafka and Quarkus. However, this time we will use Apache Camel instead of several Quarkus extensions including Kafka support.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

First, you should go to the saga directory. It contains two applications built on top of Quarkus and Apache Camel. Today we will implement an eventual consistency pattern (also known as a SAGA pattern). It will use the Knative Eventing model for exchanging events between our applications.

1. Prerequisites

Before we start, we need to configure some components like Kafka, Knative or Kafka Eventing Broker. Let’s go further these steps.

1.1. Install Apache Kafka cluster

Firstly, let’s create our kafka namespace.

$ kubectl create namespace kafka

Then we apply the installation files, including ClusterRolesClusterRoleBindings and other required Kubernetes CustomResourceDefinitions (CRD).

$ kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

After that, we can create a single-node persistent Apache Kafka cluster. We use an example custom resource for the Strimzi operator.

$ kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka

Finally, we may verify our installation. You should the same result as shown below.

1.2. Install Knative Serving and Eventing

You can install Knative on your Kubernetes cluster using YAML manifests or operator. The current version of Knative is 0.23. This is minimal list of steps you need to do with YAML-based installation is visible below. For more details, you may refer to the documentation. I place or the required commands to simplify a process for you. Let’s install Knative Serving.

$ kubectl apply -f https://github.com/knative/serving/releases/download/v0.23.0/serving-crds.yaml
$ kubectl apply -f https://github.com/knative/serving/releases/download/v0.23.0/serving-core.yaml
$ kubectl apply -f https://github.com/knative/net-kourier/releases/download/v0.23.0/kourier.yaml
$ kubectl patch configmap/config-network \
  --namespace knative-serving \
  --type merge \
  --patch '{"data":{"ingress.class":"kourier.ingress.networking.knative.dev"}}'

Then let’s install Knative Eventing.

$ kubectl apply -f https://github.com/knative/eventing/releases/download/v0.23.0/eventing-crds.yaml
$ kubectl apply -f https://github.com/knative/eventing/releases/download/v0.23.0/eventing-core.yaml

1.3. Install Knative Kafka Eventing

The following commands install the Apache Kafka broker, and run event routing in a system namespace, knative-eventing, by default.

$ kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-controller.yaml
$ kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-broker.yaml

Then, we should install CRD with KafkaBinding and KafkaSource.

$ kubectl apply -f https://storage.googleapis.com/knative-releases/eventing-contrib/latest/kafka-source.yaml

Finally, let’s just create a broker.

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: default
  namespace: default
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing

1.4. Install Apache Camel K operator (optional)

If you would like to use Apache Camel K to run the Quarkus application on Knative you first install its operator. After downloading Camel K CLI you just need to run the following command.

$ kamel install

2. Run the application with Apache Camel K

In order to deploy and run an application on Knative, we may execute the kamel run command. Some parameters might be set in the source file. In the command visible below, I’m setting the name Knative service, source file location, and Quarkus properties.

$ kamel run --name order-saga --dev OrderRoute.java \
    -p quarkus.datasource.db-kind=h2 \
    -p quarkus.datasource.jdbc.url=jdbc:h2:mem:testdb \
    -p quarkus.hibernate-orm.packages=com.github.piomin.entity.model.order

For more details about deploying Quarkus with Camel K on Kubernetes you may refer to my article Apache Camel K and Quarkus on Kubernetes.

Finally, I was not able to deploy exactly this application on Knative with Camel K. That’s because it didn’t see JPA entities included in the application with the external library. However, the application is also prepared for deploying with Camel K. The whole source code is a single Java file and there are some Camel K modeline hooks in this source code.

3. Integrate Quarkus with Apache Camel

We can easily integrate Apache Camel routes with Quarkus. Camel Quarkus provides extensions for many of the Camel components. We need to include those components in our Maven pom.xml. What type of components do we need? The full list is visible below. However, first, let’s discuss them a little bit more.

Our application uses JPA to store entities in the H2 database. So we will include the Camel Quarkus JPA extension to provide the JPA implementation with Hibernate. For a single persistence unit, this extension automatically creates EntityManagerFactory and TransactionManager. In order to integrate with Apache Kafka, we need to include the Camel Quarkus Kafka extension. In order to receive events from Knative, we should expose the HTTP POST endpoint. That’s why we need several extensions like Platform HTTP or Jackson. Here’s my list of Maven dependencies.

<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-core</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-platform-http</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-bean</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-timer</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-jpa</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-rest</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-jackson</artifactId>
</dependency>

Then, we just need to create a class that extends RouteBuilder. The routes have to be defined inside the configure() method. Before we get into the details let’s analyze our domain model classes.

public class CustomerRoute extends RouteBuilder {
   @Override
   public void configure() throws Exception { 
      ...
   }
}

4. Domain model for Quarkus JPA and Kafka

I created a separate project for entity model classes. The repository is available on GitHub https://github.com/piomin/entity-model.git. Thanks to that, I have a typical serverless application with consists of a single class. It is also possible to easily deploy it on Knative with Camel K. Here’s a model entity for order-saga.

@Entity
@Table(name = "orders")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Order implements Serializable {
    @Id
    @GeneratedValue
    private Long id;
    private Integer customerId;
    private Integer productId;
    private int amount;
    private int productCount;
    @Enumerated
    private OrderStatus status = OrderStatus.NEW;
}

Just to simplify, I’m using the same class when sending events to Kafka. We can also take a look at a model entity for customer-saga.

@Entity
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Customer implements Serializable {
    @Id
    @GeneratedValue
    private Long id;
    private String name;
    private int amountAvailable;
    private int amountReserved;
}

5. Building Camel routes with Quarkus and Kafka extension

In the first step, we are going to generate orders and send them to the Kafka topic. Before that, we will store them in the H2 database using the Camel JPA extension. This part of logic is implemented inside order-saga.

from("timer:tick?period=10000")
   .setBody(exchange -> 
      new Order(null, r.nextInt(10) + 1, r.nextInt(10) + 1, 100, 1, OrderStatus.NEW))
   .to("jpa:" + Order.class.getName())
   .marshal().json(JsonLibrary.Jackson)
   .log("New Order: ${body}")
   .toD("kafka:order-events?brokers=${env.KAFKA_BOOTSTRAP_SERVERS}");

Some things need to be clarified here. Before sending a message to the Kafka topic we need to serialize it to the JSON format. The application does not anything about the Kafka address. This address has been injected into the container by the KafkaBinding object. It is available for the Camel route as the environment variable KAFKA_BOOTSTRAP_SERVERS.

Now, let’s switch to the customer-saga application. In order to receive an event for the Knative broker, we should expose an HTTP POST endpoint. This endpoint takes Order as an input. Then, if the order’s status equals NEW it performs a reservation on the customer account. Before that, it sends back a response to a reserver-events topic.

Also, let’s take a look at the fragment responsible for searching the customer in the database and performing an update. We use Quarkus Camel JPA extension. First, we need to define a JPQL query to retrieve an entity. Then, we update the Customer entity depending on the order status.

rest("/customers")
   .post("/reserve").consumes("application/json")
   .route()
      .log("Order received: ${body}")
      .unmarshal().json(JsonLibrary.Jackson, Order.class)
      .choice()
         .when().simple("${body.status.toString()} == 'NEW'")
            .setBody(exchange -> {
               Order order = exchange.getIn().getBody(Order.class);
               order.setStatus(OrderStatus.IN_PROGRESS);
               return order;
            })
            .marshal().json(JsonLibrary.Jackson)
            .log("Reservation sent: ${body}")
            .toD("kafka:reserve-events?brokers=${env.KAFKA_BOOTSTRAP_SERVERS}")
      .end()
      .unmarshal().json(JsonLibrary.Jackson, Order.class)
      .setProperty("orderAmount", simple("${body.amount}", Integer.class))
      .setProperty("orderStatus", simple("${body.status}", OrderStatus.class))
      .toD("jpa:" + Customer.class.getName() + 
         "?query=select c from Customer c where c.id= ${body.customerId}")
      .choice()
         .when().simple("${exchangeProperty.orderStatus} == 'IN_PROGRESS'")
            .setBody(exchange -> {
               Customer customer = (Customer) exchange.getIn().getBody(List.class).get(0);
               customer.setAmountReserved(customer.getAmountReserved() + 
                  exchange.getProperty("orderAmount", Integer.class));
               customer.setAmountAvailable(customer.getAmountAvailable() - 
                  exchange.getProperty("orderAmount", Integer.class));
               return customer;
            })
            .otherwise()
               .setBody(exchange -> {
                  Customer customer = (Customer) exchange.getIn().getBody(List.class).get(0);
                  customer.setAmountReserved(customer.getAmountReserved() - 
                     exchange.getProperty("orderAmount", Integer.class));
                  return customer;
               })
      .end()
      .log("Current customer: ${body}")
      .to("jpa:" + Customer.class.getName() + "?useExecuteUpdate=true")
      .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(201)).setBody(constant(null))
.endRest();

We can also generate some test data in customer-saga using Camel route. It runs once just after the Quarkus application startup.

from("timer://runOnce?repeatCount=1&delay=100")
   .loop(10)
      .setBody(exchange -> new Customer(null, "Test"+(++i), r.nextInt(50000), 0))
      .to("jpa:" + Customer.class.getName())
      .log("Add: ${body}")
   .end();

6. Configure Knative Eventing with Kafka broker

6.1. Architecture with Quarkus, Camel and Kafka

We have already created two applications built on top of Quarkus and Apache Camel. Both of these applications expose HTTP POST endpoints and send events to the Kafka topics. Now, we need to create some Kubernetes objects to orchestrate the process. So far, we just send the events to topics, they have not been routed to the target applications. Let’s take a look at the architecture of our system. There are two topics on Kafka that receive events: order-events and reserve-events. The messages from those topics are not automatically sent to the Knative broker. So, first, we need to create the KafkaSource object to get messages from these topics and send them to the broker.

quarkus-camel-kafka-arch

6.2. Configure Knative eventing

The KafkaSource object takes the list of input topics and a target application. In that case, the target application is the Knative broker. We may create a single KafkaSource for both topics or two sources per each topic. Here’s the KafkaSource definition that takes messages from the reserve-events topic.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-reserve-order
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - reserve-events
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default

Let’s create both sources. After creating them, we may verify if everything went well by executing the command kubectl get sources.

quarkus-camel-kafka-sources

Before running our application on Knative we should create KafkaBinding objects. This object is responsible for injecting the address of the Kafka cluster into the application containers. The address of a broker will be available for the application under the KAFKA_BOOTSTRAP_SERVERS environment variable.

apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
  name: kafka-binding-customer-saga
spec:
  subject:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: customer-saga
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092

Let’s create both KafkaBinding objects. Here’s the list of available bindings after running the kubectl get bindings command.

Finally, we may proceed to the last step of our configuration. We will create triggers. A trigger represents a desire to subscribe to events from a specific broker. Moreover, we may apply a simple filtering mechanism using the Trigger object. For example, if we want to send only the events from the order-events topic and with the type dev.knative.kafka.event we should create a definition as shown below.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: customer-saga-trigger
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/kafka-source-orders-customer#order-events
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /customers/reserve

Similarly, we should create a trigger that sends messages to the order-saga POST endpoint. It gets messages from the reserve-events source and sends them to the /orders/confirm endpoint.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: order-saga-trigger
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/kafka-source-reserve-order#reserve-events
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: order-saga
    uri: /orders/confirm

Finally, we can display a list of active triggers by executing the command kubectl get trigger.

quarkus-camel-kafka-triggers

7. Deploy Quarkus application on Knative

Once we finished the development of our sample applications we may deploy them on Knative. One of the possible deployment options is with Apache Camel K. In case of any problems with this type of deployment we may also use the Quarkus Kubernetes module. Firstly, let’s include two required modules. We will also leverage the Jib Maven plugin.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-kubernetes</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-container-image-jib</artifactId>
</dependency>

The rest of the configuration should be provided inside the application properties file. In the first step, we need to enable automatic deployment on Kubernetes by setting the property quarkus.kubernetes.deploy to true. By default, Quarkus creates a standard Kubernetes Deployment. Therefore, we should set the quarkus.kubernetes.deployment-target to knative. In that case, it will generate a Knative Service YAML. Finally, we have to change the name of the image group to dev.local. Of course, it is required just if we run our applications on the local Kubernetes cluster like me.

quarkus.kubernetes.deploy = true
quarkus.kubernetes.deployment-target = knative
quarkus.container-image.group = dev.local

Now, if run the build with the mvn clean package command, our application will be automatically deployed on Knative. After that, let’s verify the list of Knative services.

Once, the order-saga application is started, it generates one order per 10 seconds and then sends it to the Kafka topic order-events. We can easily verify that it works properly, by checking out a list of active topics as shown below.

We can also verify a list of Knative events exchanged by the applications.

$ kubectl get eventtype

Final Thoughts

Quarkus and Apache Camel seem to be a perfect combination when creating serverless applications on Knative. We can easily implement the whole logic within a single source code file. We can also use Camel K to deploy our applications on Kubernetes or Knative. You can compare the approach described in this article with the one based on Quarkus and its extensions to Kafka or Knative available in this repository.

The post Knative Eventing with Quarkus, Kafka and Camel appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/06/14/knative-eventing-with-quarkus-kafka-and-camel/feed/ 4 9797
Part 1: Testing Kafka Microservices With Micronaut https://piotrminkowski.com/2019/10/09/part-1-testing-kafka-microservices-with-micronaut/ https://piotrminkowski.com/2019/10/09/part-1-testing-kafka-microservices-with-micronaut/#respond Wed, 09 Oct 2019 09:08:26 +0000 https://piotrminkowski.wordpress.com/?p=7305 I have already described how to build microservices architecture entirely based on message-driven communication through Apache Kafka in one of my previous articles Kafka In Microservices With Micronaut. As you can see in the article title the sample applications and integration with Kafka has been built on top of Micronaut Framework. I described some interesting […]

The post Part 1: Testing Kafka Microservices With Micronaut appeared first on Piotr's TechBlog.

]]>
I have already described how to build microservices architecture entirely based on message-driven communication through Apache Kafka in one of my previous articles Kafka In Microservices With Micronaut. As you can see in the article title the sample applications and integration with Kafka has been built on top of Micronaut Framework. I described some interesting features of Micronaut, that can be used for building message-driven microservices, but I didn’t specifically write anything about testing. In this article I’m going to show you example of testing your Kafka microservices using Micronaut Test core features (Component Tests), Testcontainers (Integration Tests) and Pact (Contract Tests).

Generally, automated testing is one of the biggest challenges related to microservices architecture. Therefore the most popular microservice frameworks like Micronaut or Spring Boot provide some useful features for that. There are also some dedicated tools which help you to use Docker containers in your tests or provide mechanisms for verifying the contracts between different applications. For the purpose of current article demo applications I’m using the same repository as for the previous article: https://github.com/piomin/sample-kafka-micronaut-microservices.git.

Sample Architecture

The architecture of sample applications has been described in the previous article but let me perform a quick recap. We have 4 microservices: order-service, trip-service, driver-service and passenger-service. The implementation of these applications is very simple. All of them have in-memory storage and connect to the same Kafka instance.
A primary goal of our system is to arrange a trip for customers. The order-service application also acts as a gateway. It is receiving requests from customers, saving history and sending events to orders topic. All the other microservices are listening on this topic and processing orders sent by order-service. Each microservice has its own dedicated topic, where it sends events with information about changes. Such events are received by some other microservices. The architecture is presented in the picture below.

micronaut-kafka-1

Embedded Kafka – Component Testing with Micronaut

After a short description of the architecture we may proceed to the key point of this article – testing. Micronaut allows you to start an embedded Kafka instance for the purpose of testing. To do that you should first include the following dependencies to your Maven pom.xml:

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>2.3.0</version>
   <classifier>test</classifier>
</dependency>
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.12</artifactId>
   <version>2.3.0</version>
</dependency>
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.12</artifactId>
   <version>2.3.0</version>
   <classifier>test</classifier>
</dependency>

To enable embedded Kafka for a test class we have to set property kafka.embedded.enabled to true. Because I have run Kafka on Docker container, which is by default available on address 192.168.99.100 I also need to change dynamically the value of property kafka.bootstrap.servers to localhost:9092 for a given test. The test implementation class uses embedded Kafka for testing three basic scenarios for order-service: sending orders with new trip, and receiving orders for trip cancellation and completion from other microservices. Here’s the full code of my OrderKafkaEmbeddedTest

@MicronautTest
@Property(name = "kafka.embedded.enabled", value = "true")
@Property(name = "kafka.bootstrap.servers", value = "localhost:9092")
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class OrderKafkaEmbeddedTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderKafkaEmbeddedTest.class);

    @Inject
    OrderClient client;
    @Inject
    OrderInMemoryRepository repository;
    @Inject
    OrderHolder orderHolder;
    @Inject
    KafkaEmbedded kafkaEmbedded;

    @BeforeAll
    public void init() {
        LOGGER.info("Topics: {}", kafkaEmbedded.getKafkaServer().get().zkClient().getAllTopicsInCluster());
    }

    @Test
    @org.junit.jupiter.api.Order(1)
    public void testAddNewTripOrder() throws InterruptedException {
        Order order = new Order(OrderType.NEW_TRIP, 1L, 50, 30);
        order = repository.add(order);
        client.send(order);
        Order orderSent = waitForOrder();
        Assertions.assertNotNull(orderSent);
        Assertions.assertEquals(order.getId(), orderSent.getId());
    }

    @Test
    @org.junit.jupiter.api.Order(2)
    public void testCancelTripOrder() throws InterruptedException {
        Order order = new Order(OrderType.CANCEL_TRIP, 1L, 50, 30);
        client.send(order);
        Order orderReceived = waitForOrder();
        Optional<Order> oo = repository.findById(1L);
        Assertions.assertTrue(oo.isPresent());
        Assertions.assertEquals(OrderStatus.REJECTED, oo.get().getStatus());
    }

    @Test
    @org.junit.jupiter.api.Order(3)
    public void testPaymentTripOrder() throws InterruptedException {
        Order order = new Order(OrderType.PAYMENT_PROCESSED, 1L, 50, 30);
        order.setTripId(1L);
        order = repository.add(order);
        client.send(order);
        Order orderSent = waitForOrder();
        Optional<Order> oo = repository.findById(order.getId());
        Assertions.assertTrue(oo.isPresent());
        Assertions.assertEquals(OrderStatus.COMPLETED, oo.get().getStatus());
    }

    private Order waitForOrder() throws InterruptedException {
        Order orderSent = null;
        for (int i = 0; i < 10; i++) {
            orderSent = orderHolder.getCurrentOrder();
            if (orderSent != null)
                break;
            Thread.sleep(1000);
        }
        orderHolder.setCurrentOrder(null);
        return orderSent;
    }

}

At that stage some things require clarification – especially the mechanism of verifying sending and receiving messages. I’ll describe it in the example of driver-service. When a message is incoming to the order topic it is received by OrderListener, which is annotated with @KafkaListener as shown below. It gets the order type and forwards the NEW_TRIP request to DriverService bean.

@KafkaListener(groupId = "driver")
public class OrderListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

    private DriverService service;

    public OrderListener(DriverService service) {
        this.service = service;
    }

    @Topic("orders")
    public void receive(@Body Order order) {
        LOGGER.info("Received: {}", order);
        switch (order.getType()) {
            case NEW_TRIP -> service.processNewTripOrder(order);
        }
    }
}

The DriverService is processing order. It is trying to find the driver located closest to the customer, changing found driver’s status to unavailable and sending events with change with the current driver state.

@Singleton
public class DriverService {

    private static final Logger LOGGER = LoggerFactory.getLogger(DriverService.class);

    private DriverClient client;
    private OrderClient orderClient;
    private DriverInMemoryRepository repository;

    public DriverService(DriverClient client, OrderClient orderClient, DriverInMemoryRepository repository) {
        this.client = client;
        this.orderClient = orderClient;
        this.repository = repository;
    }

    public void processNewTripOrder(Order order) {
        LOGGER.info("Processing: {}", order);
        Optional<Driver> driver = repository.findNearestDriver(order.getCurrentLocationX(), order.getCurrentLocationY());
        if (driver.isPresent()) {
            Driver driverLocal = driver.get();
            driverLocal.setStatus(DriverStatus.UNAVAILABLE);
            repository.updateDriver(driverLocal);
            client.send(driverLocal, String.valueOf(order.getId()));
            LOGGER.info("Message sent: {}", driverLocal);
        }
    }
   
   // OTHER METHODS ...
}

To verify that a final message with change notification has been sent to the drivers topic we have to create our own listener for the test purposes. It receives the message and writes it in @Singleton holder class which is then accessed by a single-thread test class. The described process is visualized in the picture below.
kafka-micronaut-testing-1.png
Here’s the implementation of test listener which is responsible just for receiving the message sent to drivers topic and writing it to DriverHolder bean.

@KafkaListener(groupId = "driverTest")
public class DriverConfirmListener {

   private static final Logger LOGGER = LoggerFactory.getLogger(DriverConfirmListener.class);

   @Inject
   DriverHolder driverHolder;

   @Topic("orders")
   public void receive(@Body Driver driver) {
      LOGGER.info("Confirmed: {}", driver);
      driverHolder.setCurrentDriver(driver);
   }

}

Here’s the implementation of DriverHolder class.

@Singleton
public class DriverHolder {

   private Driver currentDriver;

   public Driver getCurrentDriver() {
      return currentDriver;
   }

   public void setCurrentDriver(Driver currentDriver) {
      this.currentDriver = currentDriver;
   }

}

No matter if you are using embedded Kafka, Testcontainers or just manually started a Docker container you can use the verification mechanism described above.

Kafka with Testcontainers

We will use the Testcontainers framework for running Docker containers of Zookeeper and Kafka during JUnit tests. Testcontainers is a Java library that provides lightweight, throwaway instances of common databases, Selenium web browsers, or anything else that can run in a Docker container. To use it in your project together with JUnit 5, which is already used for our sample Micronaut application, you have to add the following dependencies to your Maven pom.xml:

<dependency>
   <groupId>org.testcontainers</groupId>
   <artifactId>kafka</artifactId>
   <version>1.12.2</version>
   <scope>test</scope>
</dependency>
<dependency>
   <groupId>org.testcontainers</groupId>
   <artifactId>junit-jupiter</artifactId>
   <version>1.12.2</version>
   <scope>test</scope>
</dependency>

The declared library org.testcontainers:kafka:1.12.2 provides KafkaContainer class that allows to define and start a Kafka container with embedded Zookeeper in your tests. However, I decided to use GenericContainer class and run two containers wurstmeister/zookeeper and wurstmeister/kafka. Because Kafka needs to communicate with Zookeeper both containers should be run in the same network. We will also have to override Zookeeper container’s name and host name to allow Kafka to call it by the hostname.
When running a Kafka container we need to set some important environment variables. Variable KAFKA_ADVERTISED_HOST_NAME sets the hostname under which Kafka is visible for external client and KAFKA_ZOOKEEPER_CONNECT Zookeeper lookup address. Although it is not recommended we should disable dynamic exposure port generation by setting static port number equal to the container binding port 9092. That helps us to avoid some problems with setting Kafka advertised port and injecting it into Micronaut configuration.

@MicronautTest
@Testcontainers
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class OrderKafkaContainerTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderKafkaContainerTest.class);

    static Network network = Network.newNetwork();

   @Container
   public static final GenericContainer ZOOKEEPER = new GenericContainer("wurstmeister/zookeeper")
      .withCreateContainerCmdModifier(it -> ((CreateContainerCmd) it).withName("zookeeper").withHostName("zookeeper"))
      .withExposedPorts(2181)
      .withNetworkAliases("zookeeper")
      .withNetwork(network);

   @Container
   public static final GenericContainer KAFKA_CONTAINER = new GenericContainer("wurstmeister/kafka")
      .withCreateContainerCmdModifier(it -> ((CreateContainerCmd) it).withName("kafka").withHostName("kafka")
         .withPortBindings(new PortBinding(Ports.Binding.bindPort(9092), new ExposedPort(9092))))
      .withExposedPorts(9092)
      .withNetworkAliases("kafka")
      .withEnv("KAFKA_ADVERTISED_HOST_NAME", "192.168.99.100")
      .withEnv("KAFKA_ZOOKEEPER_CONNECT", "zookeeper:2181")
      .withNetwork(network);
      
   // TESTS ...
   
}

The test scenarios may be the same as for embedded Kafka or we may attempt to define some more advanced integration tests. To do that we first create a Docker image of every microservice during the build. We can use io.fabric8:docker-maven-plugin for that. Here’s the example for driver-service.

<plugin>
   <groupId>io.fabric8</groupId>
   <artifactId>docker-maven-plugin</artifactId>
   <version>0.31.0</version>
   <configuration>
      <images>
         <image>
            <name>piomin/driver-service:${project.version}</name>
            <build>
               <dockerFile>${project.basedir}/Dockerfile</dockerFile>
               <tags>
                  <tag>latest</tag>
                  <tag>${project.version}</tag>
               </tags>
            </build>
         </image>
      </images>
   </configuration>
   <executions>
      <execution>
         <id>start</id>
         <phase>pre-integration-test</phase>
         <goals>
            <goal>build</goal>
            <goal>start</goal>
         </goals>
      </execution>
      <execution>
         <id>stop</id>
         <phase>post-integration-test</phase>
         <goals>
            <goal>stop</goal>
         </goals>
      </execution>
   </executions>
</plugin>

If we have a Docker image of every microservice we can easily run it using Testcontainers during our integration tests. In the fragment of test class visible below I’m running the container with driver-service in addition to Kafka and Zookeeper containers. The test is implemented inside order-service. We are building the same scenario as in the test with embedded Kafka – sending the NEW_TRIP order. But this time we are verifying if the message has been received and processed by the driver-service. This verification is performed by listening for notification events sent by driver-service started on Docker container to the drivers topic. Normally, order-service does not listen for messages incoming to drivers topic, but we created such integration just for the integration test purpose.

@Container
public static final GenericContainer DRIVER_CONTAINER = new GenericContainer("piomin/driver-service")
   .withNetwork(network);

@Inject
OrderClient client;
@Inject
OrderInMemoryRepository repository;
@Inject
DriverHolder driverHolder;

@Test
@org.junit.jupiter.api.Order(1)
public void testNewTrip() throws InterruptedException {
   Order order = new Order(OrderType.NEW_TRIP, 1L, 50, 30);
   order = repository.add(order);
   client.send(order);
   Driver driverReceived = null;
   for (int i = 0; i < 10; i++) {
      driverReceived = driverHolder.getCurrentDriver();
      if (driverReceived != null)
         break;
      Thread.sleep(1000);
   }
   driverHolder.setCurrentDriver(null);
   Assertions.assertNotNull(driverReceived);
}

Summary

In this article, I have described an approach to component testing with embedded Kafka, and Micronaut, and also integration tests with Docker and Testcontainers. This is the first part of the article, in the second I’m going to show you how to build contract tests for Micronaut applications with Pact.

The post Part 1: Testing Kafka Microservices With Micronaut appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2019/10/09/part-1-testing-kafka-microservices-with-micronaut/feed/ 0 7305
Kafka In Microservices With Micronaut https://piotrminkowski.com/2019/08/06/kafka-in-microservices-with-micronaut/ https://piotrminkowski.com/2019/08/06/kafka-in-microservices-with-micronaut/#respond Tue, 06 Aug 2019 07:14:19 +0000 https://piotrminkowski.wordpress.com/?p=7207 Today we are going to build an example of microservices that communicates with each other asynchronously through Apache Kafka topics. We use the Micronaut Framework, which provides a dedicated library for integration with Kafka. Let’s take a brief look at the architecture of our sample system. We have 4 microservices: order-service, trip-service, driver-service, and passenger-service. […]

The post Kafka In Microservices With Micronaut appeared first on Piotr's TechBlog.

]]>
Today we are going to build an example of microservices that communicates with each other asynchronously through Apache Kafka topics. We use the Micronaut Framework, which provides a dedicated library for integration with Kafka. Let’s take a brief look at the architecture of our sample system. We have 4 microservices: order-service, trip-service, driver-service, and passenger-service. The implementation of these applications is very simple. All of them have in-memory storage and connect to the same Kafka instance.

A primary goal of our system is to arrange a trip for customers. The order-service application also acts as a gateway. It is receiving requests from customers, saving history, and sending events to orders topic. All the other microservices are listening on this topic and processing orders sent by order-service. Each microservice has its own dedicated topic, where it sends events with information about changes. Such events are received by some other microservices. The architecture is presented in the picture below.

micronaut-kafka-1.png

Before reading this article it is worth familiarizing yourself with Micronaut Framework. You may read one of my previous articles describing a process of building microservices communicating via REST API: Quick Guide to Microservices with Micronaut Framework

1. Running Kafka

To run Apache Kafka on the local machine we may use its Docker image. It seems that the most up-to-date image is shared by https://hub.docker.com/u/wurstmeister. Before starting Kafka containers we have to start the ZooKeeper server, which is used by Kafka. If you run Docker on Windows the default address of its virtual machine is 192.168.99.100. It also has to be set as an environment for a Kafka container.
Both Zookeeper and Kafka containers will be started in the same network kafka. Zookeeper is available under the name zookeeper, and is exposed on port 2181. Kafka container requires that address under env variable KAFKA_ZOOKEEPER_CONNECT.

$ docker network create kafka
$ docker run -d --name zookeeper --network kafka -p 2181:2181 wurstmeister/zookeeper
$ docker run -d --name kafka -p 9092:9092 --network kafka --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 wurstmeister/kafka

2. Including Micronaut Kafka

Micronaut example applications built with Kafka can be started with or without the presence of an HTTP server. To enable Micronaut Kafka you need to include the micronaut-kafka library to your dependencies. In case you would like to expose HTTP API you should also include micronaut-http-server-netty:

<dependency>
   <groupId>io.micronaut.configuration</groupId>
   <artifactId>micronaut-kafka</artifactId>
</dependency>
<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-http-server-netty</artifactId>
</dependency>

3. Building microservice order-service

The application order-service as the only one starts embedded HTTP server and exposes REST API. That’s why we may enable built-in Micronaut health checks for Kafka. To do that we should first include micronaut-management dependency:

<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-management</artifactId>
</dependency>

For convenience, we will enable all management endpoints and disable HTTP authentication for them by defining the following configuration inside application.yml:

endpoints:
  all:
    enabled: true
    sensitive: false

Now, a health check is available under address http://localhost:8080/health. Our sample application will also expose a simple REST API for adding new orders and listing all previously created orders. Here’s the Micronaut controller implementation responsible for exposing those endpoints:

@Controller("orders")
public class OrderController {

    @Inject
    OrderInMemoryRepository repository;
    @Inject
    OrderClient client;

    @Post
    public Order add(@Body Order order) {
        order = repository.add(order);
        client.send(order);
        return order;
    }

    @Get
    public Set<Order> findAll() {
        return repository.findAll();
    }

}

Each microservice uses an in-memory repository implementation. Here’s repository implementation inside order-service:

@Singleton
public class OrderInMemoryRepository {

    private Set<Order> orders = new HashSet<>();

    public Order add(Order order) {
        order.setId((long) (orders.size() + 1));
        orders.add(order);
        return order;
    }

    public void update(Order order) {
        orders.remove(order);
        orders.add(order);
    }

    public Optional<Order> findByTripIdAndType(Long tripId, OrderType type) {
        return orders.stream().filter(order -> order.getTripId().equals(tripId) && order.getType() == type).findAny();
    }

    public Optional<Order> findNewestByUserIdAndType(Long userId, OrderType type) {
        return orders.stream().filter(order -> order.getUserId().equals(userId) && order.getType() == type)
                .max(Comparator.comparing(Order::getId));
    }

    public Set<Order> findAll() {
        return orders;
    }

}

In-memory repository stores Order object instances. Order object is also sent to Kafka topic named orders. Here’s an implementation of Order class:

public class Order {

    private Long id;
    private LocalDateTime createdAt;
    private OrderType type;
    private Long userId;
    private Long tripId;
    private float currentLocationX;
    private float currentLocationY;
    private OrderStatus status;
   
    // ... GETTERS AND SETTERS
}

4. Example of asynchronous communication with Kafka and Micronaut

Now, let’s consider one of the use cases possible to realize by our sample system – adding a new trip. In the first step (1) we are adding a new order of type OrderType.NEW_TRIP. After that order-service creates an order and send it to the orders topic. The order is received by three microservices: driver-service, passenger-service and order-service (2). A new order is processed by all these applications. The passenger-service application checks if there are sufficient funds on the passenger account. If not it cancels the trip, otherwise it does not do anything. The driver-service is looking for the nearest available driver, while trip-service creates and stores new trips. Both driver-service and trip-service sends events to their topics (drivers, trips) with information about changes (3) Every event can be accessed by other microservices, for example trip-service listen for event from driver-service in order to assign a new driver to the trip (4). The following picture illustrates the communication between our microservices when adding a new trip.

micronaut-kafka-3.png

Now, let’s proceed to the implementation details.

Step 1: Sending order

First we need to create a Kafka client responsible for sending messages to a topic. To achieve that we should create an interface annotated with @KafkaClient and declare one or more methods for sending messages. Every method should have a target topic name set through @Topic annotation. For method parameters we may use three annotations @KafkaKey, @Body or @Header. @KafkaKey is used for partitioning, which is required by our sample applications. In the client implementation visible below we just use @Body annotation.

@KafkaClient
public interface OrderClient {

    @Topic("orders")
    void send(@Body Order order);

}

Step 2: Receiving order

Once an order has been sent by the client it is received by all other microservices listening on the orders topic. Here’s a listener implementation in the driver-service. A listener class should be annotated with @KafkaListener. We may declare groupId as an annotation field to prevent from receiving the same message by more than one instance of a single application. Then we are declaring a method for processing incoming messages. The same as a client method it should be annotated with @Topic, to set the name of a target topic. Because we are listening for Order objects it should be annotated with @Body – the same as the corresponding client method.

@KafkaListener(groupId = "driver")
public class OrderListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

    private DriverService service;

    public OrderListener(DriverService service) {
        this.service = service;
    }

    @Topic("orders")
    public void receive(@Body Order order) {
        LOGGER.info("Received: {}", order);
        switch (order.getType()) {
            case NEW_TRIP -> service.processNewTripOrder(order);
        }
    }

}

Step 3: Sending to other Kafka topic

Now, let’s take a look on the processNewTripOrder method inside driver-service. DriverService injects two different Kafka client beans: OrderClient and DriverClient. When processing a new order it tries to find the available driver, which is the closest to the customer who sent the order. After finding him it changes the status to UNAVAILABLE and sends the message with Driver object to the drivers topic.

@Singleton
public class DriverService {

    private static final Logger LOGGER = LoggerFactory.getLogger(DriverService.class);

    private DriverClient client;
    private OrderClient orderClient;
    private DriverInMemoryRepository repository;

    public DriverService(DriverClient client, OrderClient orderClient, DriverInMemoryRepository repository) {
        this.client = client;
        this.orderClient = orderClient;
        this.repository = repository;
    }

    public void processNewTripOrder(Order order) {
        LOGGER.info("Processing: {}", order);
        Optional<Driver> driver = repository.findNearestDriver(order.getCurrentLocationX(), order.getCurrentLocationY());
        driver.ifPresent(driverLocal -> {
            driverLocal.setStatus(DriverStatus.UNAVAILABLE);
            repository.updateDriver(driverLocal);
            client.send(driverLocal, String.valueOf(order.getId()));
            LOGGER.info("Message sent: {}", driverLocal);
        });
    }
   
    // ...
}

Here’s an implementation of Kafka client inside driver-service used for sending messages to the drivers topic. Because we need to link the instance of Driver with order we annotate orderId parameter with @Header. There is no sense to include it to Driver class just to assign it to the right trip on the listener side.

@KafkaClient
public interface DriverClient {

    @Topic("drivers")
    void send(@Body Driver driver, @Header("Order-Id") String orderId);

}

Step 4: Inter-service communication example with Micronaut Kafka

The message sent by DriverClient is received by @Listener declared inside trip-service. It listens for messages incoming to the trips topic. The signature of receiving method is pretty similar to the client sending method as shown below:

@KafkaListener(groupId = "trip")
public class DriverListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

    private TripService service;

    public DriverListener(TripService service) {
        this.service = service;
    }

    @Topic("drivers")
    public void receive(@Body Driver driver, @Header("Order-Id") String orderId) {
        LOGGER.info("Received: driver->{}, header->{}", driver, orderId);
        service.processNewDriver(driver);
    }

}

A new driver with given id is being assigned to the trip searched by orderId. That’s a final step of our communication process when adding a new trip.

@Singleton
public class TripService {

    private static final Logger LOGGER = LoggerFactory.getLogger(TripService.class);

    private TripInMemoryRepository repository;
    private TripClient client;

    public TripService(TripInMemoryRepository repository, TripClient client) {
        this.repository = repository;
        this.client = client;
    }


    public void processNewDriver(Driver driver, String orderId) {
        LOGGER.info("Processing: {}", driver);
        Optional<Trip> trip = repository.findByOrderId(Long.valueOf(orderId));
        trip.ifPresent(tripLocal -> {
            tripLocal.setDriverId(driver.getId());
            repository.update(tripLocal);
        });
    }
   
   // ... OTHER METHODS

}

5. Tracing

We may easily enable distributed tracing with Micronaut Kafka. First, we need to enable and configure Micronaut Tracing. To do that you should first add some dependencies:

<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-tracing</artifactId>
</dependency>
<dependency>
    <groupId>io.zipkin.brave</groupId>
    <artifactId>brave-instrumentation-http</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-reporter</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.opentracing.brave</groupId>
    <artifactId>brave-opentracing</artifactId>
</dependency>
<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-client</artifactId>
    <version>0.0.16</version>
    <scope>runtime</scope>
</dependency>

We also need to configure some application settings inside application.yml including an address of our tracing tool. In that case, it is Zipkin.

tracing:
  zipkin:
    enabled: true
    http:
      url: http://192.168.99.100:9411
    sampler:
      probability: 1

Before starting our application we have to run Zipkin container:

$ docker run -d --name zipkin -p 9411:9411 openzipkin/zipkin

Conclusion

In this article you were guided through the process of building microservice architecture using asynchronous communication via Apache Kafka. I have shown you ea example with the most important features of the Micronaut Kafka library that allows you to easily declare producer and consumer of Kafka topics, enable health checks, and distributed tracing for your microservices. I have described an implementation of a single scenario for our system, that covers adding a new trip at the customer’s request. In order to see the full implementation of the sample system described in this article please check out the source code available on GitHub: https://github.com/piomin/sample-kafka-micronaut-microservices.git.

The post Kafka In Microservices With Micronaut appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2019/08/06/kafka-in-microservices-with-micronaut/feed/ 0 7207