spring kafka Archives - Piotr's TechBlog https://piotrminkowski.com/tag/spring-kafka/ Java, Spring, Kotlin, microservices, Kubernetes, containers Mon, 11 Mar 2024 13:10:02 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 spring kafka Archives - Piotr's TechBlog https://piotrminkowski.com/tag/spring-kafka/ 32 32 181738725 Kafka Offset with Spring Boot https://piotrminkowski.com/2024/03/11/kafka-offset-with-spring-boot/ https://piotrminkowski.com/2024/03/11/kafka-offset-with-spring-boot/#comments Mon, 11 Mar 2024 13:09:59 +0000 https://piotrminkowski.com/?p=15064 In this article, you will learn how to manage Kafka consumer offset with Spring Boot and Spring Kafka. An inspiration for preparing this article was the feedback I received after publishing the post describing concurrency with Kafka and Spring Boot. You were asking me questions related not only to concurrency but also to the consumer […]

The post Kafka Offset with Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to manage Kafka consumer offset with Spring Boot and Spring Kafka. An inspiration for preparing this article was the feedback I received after publishing the post describing concurrency with Kafka and Spring Boot. You were asking me questions related not only to concurrency but also to the consumer offset committing process. In the previous article, I focused mostly on showing that the way how the app handles Kafka messages may impact the overall performance of our system. I didn’t consider things like message duplicates or losing messages on the consumer side. Today, we are going to discuss exactly those topics.

If you are interested in Kafka and Spring Boot you can find several articles about it on my blog. Except for the already mentioned post about concurrency, you can read e.g. about Kafka transactions here. On the other hand, to read about microservices with Kafka and Spring Boot refer to the following article.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Introduction

Before we start, we need to clarify some important things related to committing offsets with Spring Kafka. First of all, by default, Spring Kafka sets the consumer enable.auto.commit property to false. It means that the framework, not Kafka, is responsible for committing an offset. Of course, we can change the default behavior by setting that property to true. By the way, it was the default approach before Spring Kafka 2.3. 

Once we stay with Kafka auto committing disabled, we can leverage 7 different commit strategies provided by Spring Kafka. Today, we won’t analyze all of them, but just the most significant. The default strategy is BATCH. In order to set the different strategy, we need to override the AckMode e.g. by setting a value of the property spring.kafka.listener.ack-mode in Spring Boot application properties. However, firstly, let’s focus on the BATCH mode.

Sample Spring Boot Kafka Apps

In order to test the offset committing with Spring Kafka, we will create two simple apps: producer and consumer. Producer sends a defined number of messages to the topic, while the consumer receives and processes them. Here’s the producer @RestController implementation. It allows us to send a defined number of messages to the transactions topic on demand:

@RestController
public class TransactionsController {

   private static final Logger LOG = LoggerFactory
            .getLogger(TransactionsController.class);

   long id = 1;
   long groupId = 1;
   KafkaTemplate<Long, Order> kafkaTemplate;

   @PostMapping("/transactions")
   public void generateAndSendMessages(@RequestBody InputParameters inputParameters) {
      for (long i = 0; i < inputParameters.getNumberOfMessages(); i++) {
         Order o = new Order(id++, i+1, i+2, 1000, "NEW", groupId);
         CompletableFuture<SendResult<Long, Order>> result =
                 kafkaTemplate.send("transactions", o.getId(), o);
         result.whenComplete((sr, ex) ->
                 LOG.info("Sent({}): {}", sr.getProducerRecord().key(), sr.getProducerRecord().value()));
      }
      groupId++;
   }

}

Here are the producer app configuration properties. We need to set the address of a Kafka broker, and serializer classes for a key (Long) and a value (JSON format).

spring:
  application.name: producer
  kafka:
    bootstrap-servers: ${KAFKA_URL}
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

We can trigger the process of sending messages, by calling the POST /transactions endpoint as shown below:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d '{"numberOfMessages":10}'

Here’s the consumer app listener bean implementation. As you see, it is very simple. It just receives and prints the messages. We are sleeping the thread for 10 seconds, just to be able to easily check the offset on the Kafka topic during the test.

@Service
public class Listener {

   private static final Logger LOG = LoggerFactory
          .getLogger(NoTransactionsListener.class);

   @KafkaListener(
          id = "transactions",
          topics = "transactions",
          groupId = "a"
   )
   public void listen(@Payload Order order,
                      @Header(KafkaHeaders.OFFSET) Long offset,
                      @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) throws InterruptedException {
      LOG.info("[partition={},offset={}] Starting: {}", partition, offset, order);
      Thread.sleep(10000L);
      LOG.info("[partition={},offset={}] Finished: {}", partition, offset, order);
   }

}

In order to see what exactly happens in the consumer app, we need to increase the default logging level for Spring Kafka to DEBUG. There are also some other properties related to the serialization and deserialization of messages in the application properties. Here’s the whole application.yml file for the consumer app:

spring:
  application.name: consumer
  output.ansi.enabled: ALWAYS
  kafka:
    bootstrap-servers: ${KAFKA_URL:localhost}:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"

logging.level:
  org.springframework.kafka: debug

Understanding How Spring Kafka Commits the Offset

Then, let’s start both our apps by executing the Maven commands visible below. I assume that you already have the Kafka broker running. Once the consumer is connected to the transactions topic, we can send 10 messages by calling the already-mentioned POST /transactions endpoint. After that, we will switch the consumer app logs. We can see all significant information related to the offset committing.

# build the whole project
$ mvn clean package

# run the consumer app
$ cd consumer
$ mvn spring-boot:run

# run the producer app
$ cd producer
$ mvn spring-boot:run

So, here are the logs from our test. I highlighted the most important parts. Of course, your results may differ slightly, but the rules are the same. First of all, the consumer receives a batch of messages. In that case, there are 2 messages, but for example, it consumes 7 in one step in the next partition. Without detailed logs, you won’t even be aware of how it behaves, since the message listener processes a message after a message. However, the offset commit action is performed after processing all the consumed messages. That’s because we have the AckMode set to BATCH.

spring-kafka-offset-batch

Of course, it doesn’t have any impact on the app… as long as it is running. In case if not a graceful restart or crash occurs in the time between starting processing of batch messages and offset commit action it may cause some problems. Don’t get me wrong – it’s a standard situation that results in message duplicates on the consumer side. So, now our app consumes 7 messages in a batch. Let’s stop it during batch processing as shown below. By the way, with a graceful shutdown, Spring Kafka waits until the last message in the batch is processed. Therefore, I simulated an immediate stop with the SIGKILL option for the testing purposes.

spring-kafka-offset-batch-kill

The consumer offset has not been committed. We can verify it by checking out the current value of the consumer offset on the 1 partition. You can compare that value with the values highlighted in the logs above.

Then, let’s start our consumer app once again. The app starts reading messages from the latest committed offset for each partition. Consequently, it processes several messages already processed previously, before killing the consumer instance. As you see, the consumer app is processing orders with the 3, 5, 6, 8, and 10 id once again. We need to take such situations into account during the business logic implementation. After processing the last message in batch, Spring Kafka commits the offset.

spring-kafka-offset-batch-duplicates

Finally, everything works fine. There is no customer lag on any partition.

Using the RECORD Mode to Commit Offset

In the next step, we will compare a similar situation with the AckMode set to RECORD. According to the Spring Kafka docs the RECORD mode “commits the offset when the listener returns after processing the record”. In order to enable the RECORD mode, we need to set the following in the application.yml file:

spring.kafka.listener.ack-mode: RECORD

Then, we have to restart the consumer app. After that, we can trigger the process of sending messages once again, by calling the POST /transactions endpoint exposed by the producer app:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d '{\"numberOfMessages\":10}'

Let’s switch to the logs. As you see, each time after processing a single record by the @KafkaListener method Spring Kafka commits the offset. I guess that some of you assumed that this was the default behavior (not the BATCH mode) 🙂 That approach decreases the potential number of duplicate messages after the restart, but on the other hand, impacts the overall performance of the consumer.

spring-kafka-offset-record

The latest committed customer offset visible in the logs is 8. So, if we switch to the GUI client we can verify that the current offset there has the same value.

Graceful Shutdown

Although our app commits the offset each time after processing a single record, in the graceful shutdown Spring Boot waits until the whole batch is processed. As you see, I initiated the shutdown procedure at 15:12:41, but the container performed a shutdown after a 30-second timeout. That’s because I included 10 seconds of thread sleep in the processing method. It results in the total time of processing the batch of messages higher than 30 seconds.

spring-kafka-offset-batch-shutdown

However, we can change that behavior. We need to set the spring.kafka.listener.immediate-stop property to true. That property decides whether the container stops after the current record is processed or after all the records from the previous poll are processed.

spring.kafka.listener.immediate-stop: true

After restarting the consumer app we need to take a look at the logs once again. The Spring container starts a shutdown procedure just after committing the offset after processing the last record.

Spring Kafka Offset and Concurrency

Processing Messages with the Custom Thread Pool

Finally, the last scenario in our article. Let’s consider the case when we are using the custom thread to handle messages received by the @KafkaListener method. In order to do that, we can define the ExecutorService object. Once the listenAsync() method receives the message it delegates processing to the Processor bean by calling its process() method using the ExecutorService object.

@Service
public class Listener {

   private static final Logger LOG = LoggerFactory
          .getLogger(Listener.class);

   ExecutorService executorService = Executors.newFixedThreadPool(30);

   @Autowired
   private Processor processor;

   @KafkaListener(
          id = "transactions-async",
          topics = "transactions-async",
          groupId = "a"
   )
   public void listenAsync(@Payload Order order,
                     @Header(KafkaHeaders.OFFSET) Long offset,
                     @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
      LOG.info("[partition={},offset={}] Starting Async: {}", partition, offset, order);
      executorService.submit(() -> processor.process(order));
   }
}

In the Processor bean, we are sleeping the thread for 10 seconds for testing purposes. The process() method doesn’t do anything important, it just prints the log at the start and before finishing.

@Service
public class Processor {

   private static final Logger LOG = LoggerFactory
          .getLogger(Listener.class);

   public void process(Order order) {
      LOG.info("Processing: {}", order.getId());
      try {
         Thread.sleep(10000L);
      } catch (InterruptedException e) {
         throw new RuntimeException(e);
      }
      LOG.info("Finished: {}", order.getId());
   }

}

Let’s analyze what will happen after sending some message to such a consumer. This time we are using the transaction-async topic. By default, Spring Kafka commits the offset after processing the whole batch of 4 received messages. However, it happens almost immediately after receiving the messages, because we are delegating the further processing to another thread. The asynchronous method finishes processing after 10 seconds. If your app crashes during those 10 seconds, it will result in losing messages. They won’t be processed by the new instance of the app, because the offset has already been committed before message processing has been finished.

Enable Manual Offset Commit

Once again, it is a normal situation, that we can lose messages with the Kafka consumer. However, we can handle such cases slightly differently. Instead of relying on the container-managed offset commitment, we can switch to the manual mode. First of all, let’s add the following property to the Spring Boot application.yml file:

spring.kafka.listener.ack-mode: MANUAL_IMMEDIATE

Then we need to leverage the Acknowledgment interface to take a control over the offset commitment process inside the listener. As you see, we have to include such an interface to the @KafkaListener method parameters. After that, we can pass it to the process() method running in the different thread.

@Service
public class Listener {

   private static final Logger LOG = LoggerFactory
          .getLogger(Listener.class);

   ExecutorService executorService = Executors.newFixedThreadPool(30);

   @Autowired
   private Processor processor;

   @KafkaListener(
          id = "transactions-async",           
          topics = "transactions-async",
          groupId = "a"
   )
   public void listenAsync(@Payload Order order,
                     Acknowledgment acknowledgment,
                     @Header(KafkaHeaders.OFFSET) Long offset,
                     @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
      LOG.info("[partition={},offset={}] Starting Async: {}", partition, offset, order);
      executorService.submit(() -> processor.process(order, acknowledgment));
   }
}

With the acknowledge() provided by the Acknowledgment interface we can manually commit the offset in the selected location in the code. Here, we are making a commit at the of the whole method.

@Service
public class Processor {

   private static final Logger LOG = LoggerFactory
          .getLogger(Listener.class);

   public void process(Order order, Acknowledgment acknowledgment) {
      LOG.info("Processing: {}", order.getId());
      try {
         Thread.sleep(10000L);
      } catch (InterruptedException e) {
         throw new RuntimeException(e);
      }
      LOG.info("Finished: {}", order.getId());
      acknowledgment.acknowledge();
   }

}

Let’s switch to the consumer app logs once again. As you the offset commit happens almost immediately after processing the message. By the way, the MANUAL (instead of MANUAL_IMMEDIATE) AckMode will wait with the commit until the whole batch records will be processed. Another thing worth mentioning here is a possibility of out-of-order commit. It is disabled by default for the Spring Boot app. In order to enable it we need to set the spring.kafka.listener.async-acks property to true. If you want to test such a scenario by yourself you can increase the number of messages sent by the producer with the numberOfMessages field e.g. to 100. Then verify the consumer lag with or without the asyncAcks property.

Finally, let’s verify the current committed offset for all the partitions using the GUI client.

Final Thoughts

Kafka consumer offset is a very interesting topic. If you want to understand Kafka, you first need to understand how consumers commit the offset on partitions. In this article, I focused on showing you how to switch between different acknowledgment modes with Spring Kafka and how impacts on your app.

The post Kafka Offset with Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2024/03/11/kafka-offset-with-spring-boot/feed/ 4 15064
Apache Kafka on Kubernetes with Strimzi https://piotrminkowski.com/2023/11/06/apache-kafka-on-kubernetes-with-strimzi/ https://piotrminkowski.com/2023/11/06/apache-kafka-on-kubernetes-with-strimzi/#comments Mon, 06 Nov 2023 08:49:30 +0000 https://piotrminkowski.com/?p=14613 In this article, you will learn how to install and manage Apache Kafka on Kubernetes with Strimzi. The Strimzi operator lets us declaratively define and configure Kafka clusters, and several other components like Kafka Connect, Mirror Maker, or Cruise Control. Of course, it’s not the only way to install Kafka on Kubernetes. As an alternative, […]

The post Apache Kafka on Kubernetes with Strimzi appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to install and manage Apache Kafka on Kubernetes with Strimzi. The Strimzi operator lets us declaratively define and configure Kafka clusters, and several other components like Kafka Connect, Mirror Maker, or Cruise Control. Of course, it’s not the only way to install Kafka on Kubernetes. As an alternative, we can use the Bitnami Helm chart available here. In comparison to that approach, Strimzi simplifies the creation of additional components. We will analyze it on the example of the Cruise Control tool.

You can find many other articles about Apache Kafka on my blog. For example, to read about concurrency with Spring Kafka please refer to the following post. There is also an article about Kafka transactions available here.

Prerequisites

In order to proceed with the exercise, you need to have a Kubernetes cluster. This cluster should have at least three worker nodes since I’m going to show you the approach with Kafka brokers spread across several nodes. We can easily simulate multiple Kubernetes nodes locally with Kind. You need to install the kind CLI tool and start Docker on your laptop. Here’s the Kind configuration manifest containing a definition of a single control plane and 4 worker nodes:

kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker
- role: worker

Then, we need to create the Kubernetes cluster based on the manifest visible above with the following kind command:

$ kind create cluster --name c1 --config cluster.yaml

The name of our Kind cluster is c1. It corresponds to the kind-c1 Kubernetes context, which is automatically set as default after creating the cluster. After that, we can display a list of Kubernetes nodes using the following kubectl command:

$ kubectl get node
NAME               STATUS   ROLES           AGE  VERSION
c1-control-plane   Ready    control-plane   1m   v1.27.3
c1-worker          Ready    <none>          1m   v1.27.3
c1-worker2         Ready    <none>          1m   v1.27.3
c1-worker3         Ready    <none>          1m   v1.27.3
c1-worker4         Ready    <none>          1m   v1.27.3

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, go to the kafka directory. There are two Spring Boot apps inside the producer and consumer directories. The required Kubernetes manifests are available inside the k8s directory. You can apply them with kubectl or using the Skaffold CLI tool. The repository is already configured to work with Skaffold and Kind. To proceed with the exercise just follow my instructions in the next sections.

Architecture

Let’s analyze our main goals in this exercise. Of course, we want to run a Kafka cluster on Kubernetes as simple as possible. There are several requirements for the cluster:

  1. It should automatically expose broker metrics in the Prometheus format. Then we will use Prometheus mechanisms to get the metrics and store them for visualization.
  2. It should consist of at least 3 brokers. Each broker has to run on a different Kubernetes worker node.
  3. Our Kafka needs to work in the Zookeeper-less mode. Therefore, we need to enable the KRaft protocol between the brokers.
  4. Once we scale up the Kafka cluster, we must automatically rebalance it to reassign partition replicas to the new broker. In order to do that, we will use the Cruise Control support in Strimzi.

Here’s the diagram that visualizes the described architecture. We will also run two simple Spring Boot apps on Kubernetes that connect the Kafka cluster and use it to send/receive messages.

kafka-on-kubernetes-arch

1. Install Monitoring Stack on Kubernetes

In the first step, we will install the monitoring on our Kubernetes cluster. We are going to use the kube-prometheus-stack Helm chart for that. It provides preconfigured instances of Prometheus and Grafana. It also comes with several CRD objects that allow us to easily customize monitoring mechanisms according to our needs. Let’s add the following Helm repository:

$ helm repo add prometheus-community \
    https://prometheus-community.github.io/helm-charts

Then, we can install the chart in the monitoring namespace. We can leave the default configuration.

$ helm install kube-prometheus-stack \
    prometheus-community/kube-prometheus-stack \
    --version 52.1.0 -n monitoring --create-namespace

2. Install Strimzi Operator on Kubernetes

In the next step, we will install the Strimzi operator on Kubernetes using Helm chart. The same as before we need to add the Helm repository:

$ helm repo add strimzi https://strimzi.io/charts

Then, we can proceed to the installation. This time we will override some configuration settings. The Strimzi Helm chart comes with a set of Grafana dashboards to visualize metrics exported by Kafka brokers and some other components managed by Strimzi. We place those dashboards inside the monitoring namespace. By default, the Strimzi chart doesn’t add the dashboards, so we also need to enable that feature in the values YAML file. That’s not all. Because we want to run Kafka in the KRaft mode, we need to enable it using feature gates. Enabling the UseKRaft feature gate requires the KafkaNodePools feature gate to be enabled as well. Then when we deploy a Kafka cluster in KRaft mode, we also must use the KafkaNodePool resources. Here’s the full list of overridden Helm chart values:

dashboards:
  enabled: true
  namespace: monitoring
featureGates: +UseKRaft,+KafkaNodePools,+UnidirectionalTopicOperator

Finally, let’s install the operator in the strimzi namespace using the following command:

$ helm install strimzi-kafka-operator strimzi/strimzi-kafka-operator \
    --version 0.38.0 \
    -n strimzi --create-namespace \
    -f strimzi-values.yaml

3. Run Kafka in the KRaft Mode

In the current version of Strimzi KRaft mode support is still in the alpha phase. This will probably change soon but for now, we have to deal with some inconveniences. In the previous section, we enabled three feature gates required to run Kafka in KRaft mode. Thanks to that we can finally define our Kafka cluster. In the first step, we need to create a node pool. This new Strimzi object is responsible for configuring brokers and controllers in the cluster. Controllers are responsible for coordinating operations and maintaining the cluster’s state. Fortunately, a single node in the poll can act as a controller and a broker at the same time.

Let’s create the KafkaNodePool object for our cluster. As you see it defines two roles: broker and controller (1). We can also configure storage for the cluster members (2). One of our goals is to avoid sharing the same Kubernetes node between Kafka brokers. Therefore, we will define the podAntiAffinity section (3). Setting the topologyKey to kubernetes.io/hostname indicates that the selected pods are not scheduled on nodes with the same hostname (4).

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: dual-role
  namespace: strimzi
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles: # (1)
    - controller
    - broker
  storage: # (2)
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 20Gi
        deleteClaim: false
  template:
    pod:
      affinity:
        podAntiAffinity: # (3)
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: strimzi.io/name
                    operator: In
                    values:
                      - my-cluster-kafka
              topologyKey: "kubernetes.io/hostname" # (4)

Once we create a node pool, we can proceed to the Kafka object creation. We need to enable Kraft mode and node pools for the particular cluster by annotating it with strimzi.io/kraft and strimzi.io/node-pools (1). The sections like storage (2) or zookeeper (5) are not used in the KRaft mode but are still required by the CRD. We should also configure the cluster metrics exporter (3) and enable the Cruise Control component (4). Of course, our cluster is exposing API for the client connection under the 9092 port.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: strimzi
  annotations: # (1)
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: '3.6'
    storage: # (2)
      type: persistent-claim
      size: 5Gi
      deleteClaim: true
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    version: 3.6.0
    replicas: 3
    metricsConfig: # (3)
      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef:
          name: kafka-metrics
          key: kafka-metrics-config.yml
  entityOperator:
    topicOperator: {}
    userOperator: {}
  cruiseControl: {} # (4)
  # (5)
  zookeeper:
    storage:
      type: persistent-claim
      deleteClaim: true
      size: 2Gi
    replicas: 3

The metricsConfig section in the Kafka object took the ConfigMap as the configuration source. This ConfigMap contains a single kafka-metrics-config.yml entry with the Prometheus rules definition.

kind: ConfigMap
apiVersion: v1
metadata:
  name: kafka-metrics
  namespace: strimzi
  labels:
    app: strimzi
data:
  kafka-metrics-config.yml: |
    lowercaseOutputName: true
    rules:
    - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
      name: kafka_server_$1_$2
      type: GAUGE
      labels:
        clientId: "$3"
        topic: "$4"
        partition: "$5"
    - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
      name: kafka_server_$1_$2
      type: GAUGE
      labels:
        clientId: "$3"
        broker: "$4:$5"
    - pattern: kafka.server<type=(.+), cipher=(.+), protocol=(.+), listener=(.+), networkProcessor=(.+)><>connections
      name: kafka_server_$1_connections_tls_info
      type: GAUGE
      labels:
        cipher: "$2"
        protocol: "$3"
        listener: "$4"
        networkProcessor: "$5"
    - pattern: kafka.server<type=(.+), clientSoftwareName=(.+), clientSoftwareVersion=(.+), listener=(.+), networkProcessor=(.+)><>connections
      name: kafka_server_$1_connections_software
      type: GAUGE
      labels:
        clientSoftwareName: "$2"
        clientSoftwareVersion: "$3"
        listener: "$4"
        networkProcessor: "$5"
    - pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+):"
      name: kafka_server_$1_$4
      type: GAUGE
      labels:
        listener: "$2"
        networkProcessor: "$3"
    - pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+)
      name: kafka_server_$1_$4
      type: GAUGE
      labels:
        listener: "$2"
        networkProcessor: "$3"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>MeanRate
      name: kafka_$1_$2_$3_percent
      type: GAUGE
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>Value
      name: kafka_$1_$2_$3_percent
      type: GAUGE
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*, (.+)=(.+)><>Value
      name: kafka_$1_$2_$3_percent
      type: GAUGE
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        "$6": "$7"
        quantile: "0.$8"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        quantile: "0.$6"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        quantile: "0.$4"
    - pattern: "kafka.server<type=raft-metrics><>(.+-total|.+-max):"
      name: kafka_server_raftmetrics_$1
      type: COUNTER
    - pattern: "kafka.server<type=raft-metrics><>(.+):"
      name: kafka_server_raftmetrics_$1
      type: GAUGE
    - pattern: "kafka.server<type=raft-channel-metrics><>(.+-total|.+-max):"
      name: kafka_server_raftchannelmetrics_$1
      type: COUNTER
    - pattern: "kafka.server<type=raft-channel-metrics><>(.+):"
      name: kafka_server_raftchannelmetrics_$1
      type: GAUGE
    - pattern: "kafka.server<type=broker-metadata-metrics><>(.+):"
      name: kafka_server_brokermetadatametrics_$1
      type: GAUGE

4. Interacting with Kafka on Kubernetes

Once we apply the KafkaNodePool and Kafka objects to the Kubernetes cluster, Strimzi starts provisioning. As a result, you should see the broker pods, a single pod related to Cruise Control, and a metrics exporter pod. Each Kafka broker pod is running on a different Kubernetes node:

Clients can connect Kafka using the my-cluster-kafka-bootstrap Service under the 9092 port:

$ kubectl get svc
NAME                         TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                                        AGE
my-cluster-cruise-control    ClusterIP   10.96.108.204   <none>        9090/TCP                                       4m10s
my-cluster-kafka-bootstrap   ClusterIP   10.96.155.136   <none>        9091/TCP,9092/TCP,9093/TCP                     4m59s
my-cluster-kafka-brokers     ClusterIP   None            <none>        9090/TCP,9091/TCP,8443/TCP,9092/TCP,9093/TCP   4m59s

In the next step, we will deploy our two apps for producing and consuming messages. The producer app sends one message per second to the target topic:

@SpringBootApplication
@EnableScheduling
public class KafkaProducer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaProducer.class);

   public static void main(String[] args) {
      SpringApplication.run(KafkaProducer.class, args);
   }

   AtomicLong id = new AtomicLong();
   @Autowired
   KafkaTemplate<Long, Info> template;

   @Value("${POD:kafka-producer}")
   private String pod;
   @Value("${NAMESPACE:empty}")
   private String namespace;
   @Value("${CLUSTER:localhost}")
   private String cluster;
   @Value("${TOPIC:test}")
   private String topic;

   @Scheduled(fixedRate = 1000)
   public void send() {
      Info info = new Info(id.incrementAndGet(), 
                           pod, namespace, cluster, "HELLO");
      CompletableFuture<SendResult<Long, Info>> result = template
         .send(topic, info.getId(), info);
      result.whenComplete((sr, ex) ->
         LOG.info("Sent({}): {}", sr.getProducerRecord().key(), 
         sr.getProducerRecord().value()));
   }
}

Here’s the Deployment manifest for the producer app:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
      - name: producer
        image: piomin/producer
        resources:
          requests:
            memory: 200Mi
            cpu: 100m
        ports:
        - containerPort: 8080
        env:
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap
          - name: CLUSTER
            value: c1
          - name: TOPIC
            value: test-1
          - name: POD
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
          - name: NAMESPACE
            valueFrom:
              fieldRef:
                fieldPath: metadata.namespace

Before running the app we can create the test-1 topic with the Strimzi CRD:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: test-1
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 12
  replicas: 3
  config:
    retention.ms: 7200000
    segment.bytes: 1000000

The consumer app is listening for incoming messages. Here’s the bean responsible for receiving and logging messages:

@SpringBootApplication
@EnableKafka
public class KafkaConsumer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaConsumer.class);

   public static void main(String[] args) {
      SpringApplication.run(KafkaConsumer.class, args);
   }

   @Value("${app.in.topic}")
   private String topic;

   @KafkaListener(id = "info", topics = "${app.in.topic}")
   public void onMessage(@Payload Info info,
      @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Long key,
      @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
      LOG.info("Received(key={}, partition={}): {}", key, partition, info);
   }
}

Here’s the Deployment manifest for the consumer app:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer
spec:
  selector:
    matchLabels:
      app: consumer
  template:
    metadata:
      labels:
        app: consumer
    spec:
      containers:
      - name: consumer
        image: piomin/consumer
        resources:
          requests:
            memory: 200Mi
            cpu: 100m
        ports:
        - containerPort: 8080
        env:
          - name: TOPIC
            value: test-1
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap

We can run both Spring Boot apps using Skaffold. Firstly, we need to go to the kafka directory in our repository. Then let’s run the following command:

$ skaffold run -n strimzi --tail

Finally, we can verify the logs printed by our apps. As you see, all the messages sent by the producer app are received by the consumer app.

kafka-on-kubernetes-logs

5. Kafka Metrics in Prometheus

Once we installed the Strimzi Helm chart with the dashboard.enabled=true and dashboard.namespace=monitoring, we have several Grafana dashboard manifests placed in the monitoring namespace. Each dashboard is represented as a ConfigMap. Let’s display a list of ConfigMaps installed by the Strimzi Helm chart:

$ kubectl get cm -n monitoring | grep strimzi
strimzi-cruise-control                                    1      2m
strimzi-kafka                                             1      2m
strimzi-kafka-bridge                                      1      2m
strimzi-kafka-connect                                     1      2m
strimzi-kafka-exporter                                    1      2m
strimzi-kafka-mirror-maker-2                              1      2m
strimzi-kafka-oauth                                       1      2m
strimzi-kraft                                             1      2m
strimzi-operators                                         1      2m
strimzi-zookeeper                                         1      2m

Since Grafana is also installed in the monitoring namespace, it automatically imports all the dashboards from ConfigMaps annotated with grafana_dashboard. Consequently, after logging into Grafana (admin / prom-operator), we can easily switch between all the Kafka-related dashboards.

The only problem is that Prometheus doesn’t scrape the metrics exposed by the Kafka pods. Since we have already configured metrics exporting on the Strimzi Kafka CRD, Kafka pods expose the /metric endpoint for Prometheus under the 9404 port. Let’s take a look at the Kafka broker pod details:

In order to force Prometheus to scrape metrics from Kafka pods, we need to create the PodMonitor object. We should place it in the monitoring (1) namespace and set the release=kube-prometheus-stack label (2). The PodMonitor object filters all the pods from the strimzi namespace (3) that contains the strimzi.io/kind label having one of the values: Kafka, KafkaConnect, KafkaMirrorMaker, KafkaMirrorMaker2 (4). Also, it has to query the /metrics endpoint under the port with the tcp-prometheus name (5).

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  name: kafka-resources-metrics
  namespace: monitoring
  labels:
    app: strimzi
    release: kube-prometheus-stack
spec:
  selector:
    matchExpressions:
      - key: "strimzi.io/kind"
        operator: In
        values: ["Kafka", "KafkaConnect", "KafkaMirrorMaker", "KafkaMirrorMaker2"]
  namespaceSelector:
    matchNames:
      - strimzi
  podMetricsEndpoints:
  - path: /metrics
    port: tcp-prometheus
    relabelings:
    - separator: ;
      regex: __meta_kubernetes_pod_label_(strimzi_io_.+)
      replacement: $1
      action: labelmap
    - sourceLabels: [__meta_kubernetes_namespace]
      separator: ;
      regex: (.*)
      targetLabel: namespace
      replacement: $1
      action: replace
    - sourceLabels: [__meta_kubernetes_pod_name]
      separator: ;
      regex: (.*)
      targetLabel: kubernetes_pod_name
      replacement: $1
      action: replace
    - sourceLabels: [__meta_kubernetes_pod_node_name]
      separator: ;
      regex: (.*)
      targetLabel: node_name
      replacement: $1
      action: replace
    - sourceLabels: [__meta_kubernetes_pod_host_ip]
      separator: ;
      regex: (.*)
      targetLabel: node_ip
      replacement: $1
      action: replace

Finally, we can display the Grafana dashboard with Kafka metrics visualization. Let’s choose the dashboard with the “Strimzi Kafka” name. Here’s the general view:

kafka-on-kubernetes-metrics

There are several other diagrams available. For example, we can take a look at the statistics related to the incoming and outgoing messages.

6. Rebalancing Kafka with Cruise Control

Let’s analyze the typical scenario around Kafka related to increasing the number of brokers in the cluster. Before we do it, we will generate more incoming traffic to the test-1 topic. In order to do it, we can use the Grafana k6 tool. The k6 tool provides several extensions for load testing – including the Kafka plugin. Here’s the Deployment manifest that runs k6 with the Kafka extension on Kubernetes.

kind: ConfigMap
apiVersion: v1
metadata:
  name: load-test-cm
  namespace: strimzi
data:
  load-test.js: |
    import {
      Writer,
      SchemaRegistry,
      SCHEMA_TYPE_JSON,
    } from "k6/x/kafka";
    const writer = new Writer({
      brokers: ["my-cluster-kafka-bootstrap.strimzi:9092"],
      topic: "test-1",
    });
    const schemaRegistry = new SchemaRegistry();
    export default function () {
      writer.produce({
        messages: [
          {
            value: schemaRegistry.serialize({
              data: {
                id: 1,
                source: "test",
                space: "strimzi",
                cluster: "c1",
                message: "HELLO"
              },
              schemaType: SCHEMA_TYPE_JSON,
            }),
          },
        ],
      });
    }
    
    export function teardown(data) {
      writer.close();
    }
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: k6-test
  namespace: strimzi
spec:
  selector:
    matchLabels:
      app.kubernetes.io/name: k6-test
  template:
    metadata:
      labels:
        app.kubernetes.io/name: k6-test
    spec:
      containers:
        - image: mostafamoradian/xk6-kafka:latest
          name: xk6-kafka
          command:
            - "k6"
            - "run"
            - "--vus"
            - "1"
            - "--duration"
            - "720s"
            - "/tests/load-test.js"
          env:
            - name: KAFKA_URL
              value: my-cluster-kafka-bootstrap
            - name: CLUSTER
              value: c1
            - name: TOPIC
              value: test-1
            - name: POD
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: NAMESPACE
              valueFrom:
                fieldRef:
                  fieldPath: metadata.namespace
          volumeMounts:
            - mountPath: /tests
              name: test
      volumes:
        - name: test
          configMap:
            name: load-test-cm

Let’s apply the manifest to the strimzi namespace with the following command:

$ kubectl apply -f k8s/k6.yaml

After that, we can take a look at the k6 Pod logs. As you see, it generates and sends a lot of messages to the test-1 topic on our Kafka cluster:

Now, let’s increase the number of Kafka brokers in our cluster. We can do it by changing the value of the replicas field in the KafkaNodePool object:

$ kubectl scale kafkanodepool dual-role --replicas=4 -n strimzi

After a while, Strimzi will start a new pod with another Kafka broker. Although we have a new member of the Kafka cluster, all the partitions are still distributed only across three previous brokers. The situation would be different for the new topic. However, the partitions related to the existing topics won’t be automatically migrated to the new broker instance. Let’s verify the current partition structure for the test-1 topic with kcat CLI (I’m exposing Kafka API locally with kubectl port-forward):

$ kcat -b localhost:9092 -L -t test-1
Metadata for test-1 (from broker -1: localhost:9092/bootstrap):
 4 brokers:
  broker 0 at my-cluster-dual-role-0.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 1 at my-cluster-dual-role-1.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 2 at my-cluster-dual-role-2.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 3 at my-cluster-dual-role-3.my-cluster-kafka-brokers.strimzi.svc:9092 (controller)
 1 topics:
  topic "test-1" with 12 partitions:
    partition 0, leader 0, replicas: 0,1,2, isrs: 1,0,2
    partition 1, leader 1, replicas: 1,2,0, isrs: 1,0,2
    partition 2, leader 2, replicas: 2,0,1, isrs: 1,0,2
    partition 3, leader 0, replicas: 0,1,2, isrs: 1,0,2
    partition 4, leader 1, replicas: 1,2,0, isrs: 1,0,2
    partition 5, leader 2, replicas: 2,0,1, isrs: 1,0,2
    partition 6, leader 0, replicas: 0,1,2, isrs: 1,0,2
    partition 7, leader 1, replicas: 1,2,0, isrs: 1,0,2
    partition 8, leader 2, replicas: 2,0,1, isrs: 1,0,2
    partition 9, leader 0, replicas: 0,2,1, isrs: 1,0,2
    partition 10, leader 2, replicas: 2,1,0, isrs: 1,0,2
    partition 11, leader 1, replicas: 1,0,2, isrs: 1,0,2

Here comes Cruise Control. Cruise Control makes managing and operating Kafka much easier. For example, it allows us to move partitions across brokers after scaling up the cluster. Let’s see how it works. We have already enabled Cruise Control in the Strimzi Kafka CRD. In order to begin a rebalancing procedure, we should create the KafkaRebalance object. This object is responsible for asking Cruise Control to generate an optimization proposal.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
  name: my-rebalance
  labels:
    strimzi.io/cluster: my-cluster
spec: {}

If the optimization proposal is ready you will see the ProposalReady value in the Status.Conditions.Type field. I won’t get into the details of Cruise Control. It suggested moving 58 partition replicas between separate brokers in the cluster.

Let’s accept the proposal by annotating the KafkaRebalance object with strimzi.io/rebalance=approve:

$ kubectl annotate kafkarebalance my-rebalance \   
    strimzi.io/rebalance=approve -n strimzi

Finally, we can run the kcat command on the test-1 topic once again. Now, as you see, partition replicas are spread across all the brokers.

$ kcat -b localhost:9092 -L -t test-1
Metadata for test-1 (from broker -1: localhost:9092/bootstrap):
 4 brokers:
  broker 0 at my-cluster-dual-role-0.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 1 at my-cluster-dual-role-1.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 2 at my-cluster-dual-role-2.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 3 at my-cluster-dual-role-3.my-cluster-kafka-brokers.strimzi.svc:9092 (controller)
 1 topics:
  topic "test-1" with 12 partitions:
    partition 0, leader 2, replicas: 2,1,3, isrs: 1,2,3
    partition 1, leader 1, replicas: 1,2,0, isrs: 1,0,2
    partition 2, leader 2, replicas: 0,2,1, isrs: 1,0,2
    partition 3, leader 0, replicas: 0,2,3, isrs: 0,2,3
    partition 4, leader 1, replicas: 3,2,1, isrs: 1,2,3
    partition 5, leader 2, replicas: 2,3,0, isrs: 0,2,3
    partition 6, leader 0, replicas: 0,1,2, isrs: 1,0,2
    partition 7, leader 1, replicas: 3,1,0, isrs: 1,0,3
    partition 8, leader 2, replicas: 2,0,1, isrs: 1,0,2
    partition 9, leader 0, replicas: 0,3,1, isrs: 1,0,3
    partition 10, leader 2, replicas: 2,3,0, isrs: 0,2,3
    partition 11, leader 1, replicas: 1,0,3, isrs: 1,0,3

Final Thoughts

Strimzi allows us not only to install and manage Kafka but also the whole ecosystem around it. In this article, I showed how to export metrics to Prometheus and use the Cruise Control tool to rebalance a cluster after scale-up. We also ran Kafka in KRaft mode and then connected two simple Java apps with the cluster through Kubernetes Service.

The post Apache Kafka on Kubernetes with Strimzi appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/11/06/apache-kafka-on-kubernetes-with-strimzi/feed/ 6 14613
Concurrency with Kafka and Spring Boot https://piotrminkowski.com/2023/04/30/concurrency-with-kafka-and-spring-boot/ https://piotrminkowski.com/2023/04/30/concurrency-with-kafka-and-spring-boot/#comments Sun, 30 Apr 2023 11:28:45 +0000 https://piotrminkowski.com/?p=14121 This article will teach you how to configure concurrency for Kafka consumers with Spring Boot and Spring for Kafka. Concurrency in Spring for Kafka is closely related to the Kafka partitions and consumer groups. Each consumer within a consumer group can receive messages from multiple partitions. While a consumer inside a group uses a single […]

The post Concurrency with Kafka and Spring Boot appeared first on Piotr's TechBlog.

]]>
This article will teach you how to configure concurrency for Kafka consumers with Spring Boot and Spring for Kafka. Concurrency in Spring for Kafka is closely related to the Kafka partitions and consumer groups. Each consumer within a consumer group can receive messages from multiple partitions. While a consumer inside a group uses a single thread, the group of consumers utilizes multiple threads to consume messages. Although each consumer is single-threaded, the processing of records can leverage multiple threads. We will analyze how to achieve it with Spring Boot and Spring for Kafka.

The topic described today, concurrency with Kafka and Spring Boot, rather deals with the basic issues. If you are looking for something more advanced in this area, you can read some of my other articles. In that article, you can find information about Kafka Streams and the Spring Cloud Stream project. You can also read more about Kafka transactions with Spring Boot in the following article.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should go to the no-transactions-service directory. After that, you should just follow my instructions. Let’s begin.

Prerequisites

We will use three different tools in the exercise today. Of course, we will create the Spring Boot consumer app using the latest version of Spring Boot 3 and Java 19. In order to run Kafka locally, we will use Redpanda – a platform compatible with Kafka API. You can easily start and manage Redpanda with their CLI tool – rpk. If you want to install rpk on your laptop please follow the installation instructions available here.

Finally, we need a tool for load testing. I’m using the k6 tool and its extension for integration with Kafka. Of course, it is just a proposition, you can use any other solution you like. With k6 I’m able to generate and send a lot of messages to Kafka quickly. In order to use k6 you need to install it on your laptop. Here are the installation instructions. After that, you need to install the xk6-kafka extension. In the following documentation, you have a full list of the k6 extensions.

Introduction

For the purpose of this exercise, we will create a simple Spring Boot application that connects to Kafka and receives messages from a single topic. From the business logic perspective, it handles transactions between the accounts and stores inside an in-memory database. Here’s a list of dependencies we need to include in the Maven pom.xml:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>

Then, let’s see the configuration settings. Our app connects to the Kafka broker using the address set in the KAFKA_URL environment variable. It expects messages in JSON format. Therefore we need to set JsonDeserializer as a value deserializer. The incoming message is serialized to the pl.piomin.services.common.model.Order object. To make it work, we need to set the spring.json.value.default.type and spring.json.trusted.packages properties. The k6 tool won’t set a header with information containing the JSON target type, so we need to disable that feature on Spring for Kafka with the spring.json.use.type.headers property.

spring:
  application.name: no-transactions-service
  kafka:
    bootstrap-servers: ${KAFKA_URL}
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        "[spring.json.value.default.type]": "pl.piomin.services.common.model.Order"
        "[spring.json.trusted.packages]": "pl.piomin.services.common.model"
        "[spring.json.use.type.headers]": false
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

Here’s the class representing incoming messages.

public class Order {

   private Long id;
   private Long sourceAccountId;
   private Long targetAccountId;
   private int amount;
   private String status;

   // GETTERS AND SETTERS...
}

The last thing we need to do is to enable Spring for Kafka and generate some test accounts for making transactions.

@SpringBootApplication
@EnableKafka
public class NoTransactionsService {

   public static void main(String[] args) {
      SpringApplication.run(NoTransactionsService.class, args);
   }

   private static final Logger LOG = LoggerFactory
      .getLogger(NoTransactionsService.class);

   Random r = new Random();

   @Autowired
   AccountRepository repository;

   @PostConstruct
   public void init() {
      for (int i = 0; i < 1000; i++) {
         repository.save(new Account(r.nextInt(1000, 10000)));
      }
   }

}

Running Kafka using Redpanda

Once we successfully installed the rpk CLI we can easily run a single-node Kafka broker by executing the following command:

$ rpk container start

Here’s the result. As you see the address of my broker is localhost:51961. The port number is generated automatically, so yours will probably be different. To simplify the next actions, let’s just set it as the REDPANDA_BROKERS environment variable.

Once we created a broker we can create a topic. We will use the topic with the transactions name in our tests. In the first step, we make tests with a single partition.

$ rpk topic create transactions -p 1

Prepare Load Tests for Kafka

Our load test will generate and send orders in JSON format with random values. The k6 tool allows us to write tests in JavaScript. We need to use the k6 Kafka extension library. The address of the Kafka broker is retrieved from the KAFKA_URL environment variable. We are incrementing the order’s id field each time we generate a new message.

import {
  Writer,
  SchemaRegistry,
  SCHEMA_TYPE_JSON,
} from "k6/x/kafka";
import { randomIntBetween } from 'https://jslib.k6.io/k6-utils/1.2.0/index.js';

const writer = new Writer({
  brokers: [`${__ENV.KAFKA_URL}`],
  topic: "transactions",
});

const schemaRegistry = new SchemaRegistry();

export function setup() {
  return { index: 1 };
}

export default function (data) {
  writer.produce({
    messages: [
      {
        value: schemaRegistry.serialize({
          data: {
            id: data.index++,
            sourceAccountId: randomIntBetween(1, 1000),
            targetAccountId: randomIntBetween(1, 1000),
            amount: randomIntBetween(10, 50),
            status: "NEW"
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
      },
    ],
  });
}

export function teardown(data) {
  writer.close();
}

Before running the test we need to set the KAFKA_URL environment variable. Then we can use the k6 run command to generate and send a lot of messages.

$ k6 run load-test.js -u 1 -d 30s 

Scenario 1: Single-partition Topic Listener

Let’s start with the defaults. Our topic has just a single partition. We are creating the @KafkaListener just with the topic and consumer group names. Once the listener receives an incoming message it invokes the AccountService bean to process the order.

@Inject
AccountService service;
    
@KafkaListener(
   id = "transactions",
   topics = "transactions",
   groupId = "a")
public void listen(Order order) {
   LOG.info("Received: {}", order);
   service.process(order);
}

Our Spring Boot Kafka app is prepared for concurrency. We will lock the Account entity during the transaction with the PESSIMISTIC_WRITE mode.

public interface AccountRepository extends CrudRepository<Account, Long> {

    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Optional<Account> findById(Long id);
}

Here’s the implementation of our AccountService bean for handling incoming orders. The process(...) method is @Transactional. In the first step, we find the source (1) and target (2) Account entity. Then we perform a transfer between the account if there are sufficient funds on the source account (3). I’m also simulating a delay just for test purposes (4). Finally, we can send a response asynchronously to another topic using the KafkaTemplate bean (5).

@Service
public class AccountService {

    private static final Logger LOG = LoggerFactory
            .getLogger(AccountService.class);
    private final Random RAND = new Random();

    KafkaTemplate<Long, Order> kafkaTemplate;
    AccountRepository repository;

    public AccountService(KafkaTemplate<Long, Order> kafkaTemplate, 
                          AccountRepository repository) {
        this.kafkaTemplate = kafkaTemplate;
        this.repository = repository;
    }

    @Transactional
    public void process(Order order) {
        Account accountSource = repository
                .findById(order.getSourceAccountId())
                .orElseThrow(); // (1)

        Account accountTarget = repository
                .findById(order.getTargetAccountId())
                .orElseThrow(); // (2)

        if (accountSource.getBalance() >= order.getAmount()) { // (3)
            accountSource.setBalance(accountSource.getBalance() - order.getAmount());
            repository.save(accountSource);
            accountTarget.setBalance(accountTarget.getBalance() + order.getAmount());
            repository.save(accountTarget);
            order.setStatus("PROCESSED");
        } else {
            order.setStatus("FAILED");
        }

        try {
            Thread.sleep(RAND.nextLong(1, 20)); // (4)
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        LOG.info("Processed: order->{}", new OrderDTO(order, accountSource, accountTarget));

        // (5)
        CompletableFuture<SendResult<Long, Order>> result = kafkaTemplate
           .send("orders", order.getId(), order);
        result.whenComplete((sr, ex) ->
                LOG.debug("Sent(key={},partition={}): {}",
                        sr.getProducerRecord().partition(),
                        sr.getProducerRecord().key(),
                        sr.getProducerRecord().value()));
    }

}

Let’s set the address of the Kafka broker in the KAFKA_URL environment variable and then start the app.

$ export KAFKA_URL=127.0.0.1:51961
$ mvn spring-boot:run

Let’s analyze what happens. Our listener is connecting to the transactions topic. It establishes just a single connection since there is a single partition.

In that case, we have just a single instance of our app running and a single thread responsible for handling messages. Let’s verify the current lag on the partition for our consumer group. As you see, the messages are processed very slowly. At first glance, you may be quite surprised.

Scenario 2: Multiple-partitions Topic Listener

Let’s analyze the next scenario. Now, the transactions topic consists of 10 partitions. We won’t change anything in the app code and configuration. We will just remove the previously created topic and create a new one with 10 partitions using the following commands:

$ rpk topic delete transactions
$ rpk topic create transaction -p 10

Once again, we are starting the app using the following Maven command:

$ mvn spring-boot:run

Let’s analyze the app logs. As you see, although we have 10 partitions there is still a single thread listening on them.

spring-boot-kafka-concurrency-single-partition

So, our situation hasn’t changed anymore. The app performance is exactly the same. However, now we can run another instance of our Spring Boot app. Once you do it you can take a look at the app logs. A new instance of the app takes 5 partitions.

In that case, a rebalancing occurs. The first instance of our Spring Boot holds 5 other partitions. Now the overall performance is twice as good as before. 

Of course, we can run more app instances. In that case, we can scale up to 10 instances since there are 10 partitions on the topic.

Scenario 3: Consumer Concurrency with Multiple Partitions

Let’s analyze another scenario. Now, we are enabling concurrency at the Kafka listener level. In order to achieve it, we need to set the concurrency field inside the @KafkaListener annotation. This parameter is still related to Kafka partitions. So, there is no sense to set the value higher than the number of partitions. In our case, there are 10 partitions – the same as in the previous scenario. 

@KafkaListener(
   id = "transactions",
   topics = "transactions",
   groupId = "a",
   concurrency = "10")
public void listen(Order order) {
   LOG.info("Received: {}", order);
   service.process(order);
}

After that, we can start the Spring Boot app. Let’s see what happens. As you see, we have 10 concurrent connections – each bound to a single thread.

spring-boot-kafka-concurrency-multi

In that case, the app performance for a single instance is around 10 times better than before. There are 10 concurrent threads, which process incoming messages.

spring-boot-kafka-concurrency-processing

However, if we run our load tests the lag on partitions is still large. Here’s the result after sending ~25k messages in 10 seconds.

spring-boot-kafka-concurrency-lag

Theoretically, we can scale up the number of instances to improve the overall performance. However, that approach won’t change anything. Why? Let’s run another one and take a look at the logs. Now, only 5 threads are still bound to the partitions. Five other threads are in the idle state. The overall performance of the system is not changed. 

Scenario 4: Process in Multiple Threads

Finally the last scenario. We will create a thread pool with the Java ExecutorService. We may still use the custom thread pool with the Kafka consumer concurrency feature as shown below (through the concurrency parameter). Each time the listener receives new messages it processes them in a separate thread.

@Service
public class NoTransactionsListener {

    private static final Logger LOG = LoggerFactory
            .getLogger(NoTransactionsListener.class);

    AccountService service;
    ExecutorService executorService = Executors.newFixedThreadPool(30);

    public NoTransactionsListener(AccountService service) {
        this.service = service;
    }

    @KafkaListener(
            id = "transactions",
            topics = "transactions",
            groupId = "a",
            concurrency = "3")
    public void listen(Order order) {
        LOG.info("Received: {}", order);
        executorService.submit(() -> service.process(order));
    }

}

In that case, one thing should be clarified. With the custom thread pool at the app level, we are losing message ordering within the single partition. The previous model guaranteed ordering, since we have a thread per partition. For our Spring Boot app, it is not important, because we are just processing messages independently.

Let’s start the app. There are 3 concurrent threads that receive messages from the partitions.

There are 30 threads for processing messages and 3 threads for listening to the partitions. Once the message is received in the consumer thread, it is handled by the worker threads.

spring-boot-kafka-concurrency-multi-threads

We can run other instances of our Spring Boot Kafka concurrency apps. I’ll run another two. The first instance grabs 4 partitions, while the next two instances 3.

Now, we can run again load test. It generated and sent ~85k messages to our Kafka broker (around 2.7k per second).

spring-boot-kafka-concurrency-k6

Let’s verify the lag within the consumer group using the rpk group command. The lag on partitions is not large. In fact, there are 90 threads within the three app instances that simultaneously are processing the incoming messages. But wait… does it mean that with 90 threads we are able to process 2.7k orders per second? We should also remember about a custom delay between 1 and 20 ms we added before (with the Thread.sleep method).

The lag looks really fine, although the app is not able to process all requests without a delay. That’s because the default ack mode for commit offset in Spring Kafka is BATCH. If we change it to the RECORD mode, which commits the offset when the listener returns after processing the record, we should get a more precise lag value. In order to override that option, we need to define the ConcurrentKafkaListenerContainerFactory bean as shown below.

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
    kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
   ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
   factory.setConsumerFactory(consumerFactory);
   factory.setConcurrency(3);
   factory.getContainerProperties()
      .setAckMode(ContainerProperties.AckMode.RECORD);
   return factory;
}

Let’s restart the app and make a load test once again. Now, the lag value is much closer to the reality.

Final Thoughts

Concurrency and performance are one of the most important things to consider when working with Kafka and Spring Boot. In this article, I wanted to explain to you some basics with simple examples. I hope it clarifies some concerns over Spring Kafka project usage.

The post Concurrency with Kafka and Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/04/30/concurrency-with-kafka-and-spring-boot/feed/ 15 14121
Deep Dive into Saga Transactions with Kafka Streams and Spring Boot https://piotrminkowski.com/2022/02/07/deep-dive-into-saga-transactions-with-kafka-streams-and-spring-boot/ https://piotrminkowski.com/2022/02/07/deep-dive-into-saga-transactions-with-kafka-streams-and-spring-boot/#comments Mon, 07 Feb 2022 14:38:20 +0000 https://piotrminkowski.com/?p=10587 In this article, you will learn how to use Kafka Streams and Spring Boot to perform transactions according to the Saga pattern. To be honest, I was quite surprised by a great deal of attention to my last article about Kafka. I got some questions about streams, transactions, and support for Kafka in Spring Boot. […]

The post Deep Dive into Saga Transactions with Kafka Streams and Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams and Spring Boot to perform transactions according to the Saga pattern. To be honest, I was quite surprised by a great deal of attention to my last article about Kafka. I got some questions about streams, transactions, and support for Kafka in Spring Boot. In this article, I’ll try to answer a few of them. I will also show how you can easily set up a cloud-managed Kafka on the Upstash.

Introduction

First of all, let’s recap the approach described in the previous article. We used Kafka Streams to process order transactions on the order-service side. To handle orders coming to the stock-service and payment-service we used a standard Spring @KafkaListener. There are also two databases – a single database per every service. The stock-service stores data related to the number of available products and updates them after receiving an order. The same with the payment-service. It updates the customer’s account on every single order. Both applications receive orders from Kafka topic. They send responses to other topics. But just to simplify, we will skip it as shown in the figure below. We treat the Kafka orders topic as a stream of events and also as a table with the latest order’s status.

kafka-streams-transactions-old-arch

What may go wrong with that approach? In fact, we have two data sources here. We use Kafka as the order store. On the other hand, there are SQL databases (in my case H2, but you can use any other) that store stock and payment data. Once we send an order with a reservation to the Kafka topic, we need to update a database. Since Kafka does not support XA transactions, it may result in data inconsistency. Of course, Kafka doesn’t support XA transactions the same as many other systems including e.g. RabbitMQ.

The question is what can we do with that? One of the possible options you may use is an approach called Change Data Capture (CDC) with the outbox pattern. CDC identifies and tracks changes to data in a database. Then it may emit those changes as events and send them, for example to the Kafka topic. I won’t go into the details of that process. If you are interested in you may read this article written by Gunnar Morling.

Architecture with Kafka Streams

The approach I will describe today is fully based on the Kafka Streams. We won’t use any SQL databases. When the order-service sends a new order its id is the message key. With Kafka Streams, we may change a message key in the stream. It results in creating new topics and repartitioning. With new message keys, we may perform calculations just for the specific customerId or productId. The result of such calculation may be saved in the persistent store. For example, Kafka automatically creates and manages such state stores when you are calling stateful operations like count() or aggregate(). We will aggregate the orders related to the particular customer or product. Here’s the illustration of our architecture. Here’s the visualization of our process.

kafka-streams-transactions-arch

Now, let’s consider a scenario for the payment-service in details. In the incoming stream of orders the payment-service calls the selectKey() operation. It changes the key from the order’s id into the order’s customerId. Then it groups all the orders by the new key and invokes the aggregate() operation. In the aggregate() method it calculates the available amount and reserved amount based on the order’s price and status (whether it is a new order or a confirmation order). If there are sufficient funds on the customer account it sends the ACCEPT order to the payment-orders topic. Otherwise, it sends the REJECT order. Then the order-service process responses by joining streams from payment-orders and stock-orders by the order’s id. As the result, it sends a confirmation or a rollback order.

kafka-streams-transactions-details

Finally, let’s proceed to the implementation!

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then switch to the streams-full branch. After that, you should just follow my instructions.

Aggregation with Kafka Streams

Let’s begin with the payment-service. The implementation of KStream in not complicated here. In the first step (1), we invoke the selectKey() method and get the customerId value of the Order object as a new key. Then we call groupByKey() method (2) to receive KGroupedStream as a result. While we have KGroupedStream we may invoke one of the calculation methods. In that case, we need to use aggregate(), since we have a little bit more advanced calculation than just a simple count (3). The last two steps are just for printing the value after calculation.

@Bean
public KStream<Long, Order> stream(StreamsBuilder builder) {
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   JsonSerde<Reservation> rsvSerde = new JsonSerde<>(Reservation.class);
   KStream<Long, Order> stream = builder
      .stream("orders", Consumed.with(Serdes.Long(), orderSerde))
      .peek((k, order) -> LOG.info("New: {}", order));

   KeyValueBytesStoreSupplier customerOrderStoreSupplier =
      Stores.persistentKeyValueStore("customer-orders");

   stream.selectKey((k, v) -> v.getCustomerId()) // (1)
      .groupByKey(Grouped.with(Serdes.Long(), orderSerde)) // (2)
      .aggregate(
         () -> new Reservation(random.nextInt(1000)),
         aggregatorService,
         Materialized.<Long, Reservation>as(customerOrderStoreSupplier)
            .withKeySerde(Serdes.Long())
            .withValueSerde(rsvSerde)) // (3)
      .toStream()
      .peek((k, trx) -> LOG.info("Commit: {}", trx));

   return stream;
}

However, the most important step in the fragment of code visible above is the class called inside the aggregate() method. The aggregate() method takes three input arguments. The first of them indicates the starting value of our compute object. That object represents the current state of the customer’s account. It has two fields: amountAvailable and amountReserved. To clarify, we use that object instead of the entity that stores available and reserved amounts on the customer account. Each customer is represented by the customerId (key) and the Reservation object (value) in Kafka KTable. Just for the test purpose, we are generating the starting value of amountAvailable as a random number between 0 and 1000.

public class Reservation {
   private int amountAvailable;
   private int amountReserved;

   public Reservation() {
   
   }

   public Reservation(int amountAvailable) {
      this.amountAvailable = amountAvailable;
   }

   // GETTERS AND SETTERS ...

}

Ok, let’s take a look at our aggregation method. It needs to implement the Kafka Aggregate interface and its method apply(). It may handle three types of orders. One of them is a confirmation of the order (1). It confirms the distributed transaction, so we just need to cancel a reservation by subtracting the order’s price from the amountReserved field. On the other, in the case of rollback, we need to increase the value of amountAvailable by the order’s price and decrease the value amountRerserved accordingly (2). Finally, if we receive a new order we need to perform a reservation if there are sufficient funds on the customer account, or otherwise, reject an order.

Aggregator<Long, Order, Reservation> aggregatorService = (id, order, rsv) -> {
   switch (order.getStatus()) {
      case "CONFIRMED" -> // (1)
         rsv.setAmountReserved(rsv.getAmountReserved() 
               - order.getPrice());
      case "ROLLBACK" -> { // (2)
         if (!order.getSource().equals("PAYMENT")) {
            rsv.setAmountAvailable(rsv.getAmountAvailable() 
                  + order.getPrice());
            rsv.setAmountReserved(rsv.getAmountReserved() 
                  - order.getPrice());
         }
      }
      case "NEW" -> { // (3)
         if (order.getPrice() <= rsv.getAmountAvailable()) {
            rsv.setAmountAvailable(rsv.getAmountAvailable() 
                  - order.getPrice());
            rsv.setAmountReserved(rsv.getAmountReserved() 
                  + order.getPrice());
            order.setStatus("ACCEPT");
         } else {
            order.setStatus("REJECT");
         }
         template.send("payment-orders", order.getId(), order);
      }
   }
   LOG.info("{}", rsv);
   return rsv;
};

State Store with the Kafka Streams Table

The implementation of the stock-service is pretty similar to the payment-service. With the difference that we count a number of available products on stock instead of available funds on the customer account. Here’s our Reservation object:

public class Reservation {
   private int itemsAvailable;
   private int itemsReserved;

   public Reservation() {
    
   }

   public Reservation(int itemsAvailable) {
      this.itemsAvailable = itemsAvailable;
   }

   // GETTERS AND SETTERS ...

}

The implementation of the aggregation method is also very similar to the payment-service. However, this time, let’s focus on another thing. Once we process a new order we need to send a response to the stock-orders topic. We use KafkaTemplate for that. In the case of payment-service we also send a response, but to the payment-orders topic. The send method from the KafkaTemplate does not block the thread. It returns the ListenableFuture objects. We may add a callback to the send method using it and the result after sending the message (1). Finally, let’s log the current state of the Reservation object (2).

Aggregator<Long, Order, Reservation> aggrSrv = (id, order, rsv) -> {
   switch (order.getStatus()) {
      case "CONFIRMED" -> rsv.setItemsReserved(rsv.getItemsReserved() 
            - order.getProductCount());
      case "ROLLBACK" -> {
         if (!order.getSource().equals("STOCK")) {
            rsv.setItemsAvailable(rsv.getItemsAvailable() 
                  + order.getProductCount());
            rsv.setItemsReserved(rsv.getItemsReserved() 
                  - order.getProductCount());
         }
      }
      case "NEW" -> {
         if (order.getProductCount() <= rsv.getItemsAvailable()) {
            rsv.setItemsAvailable(rsv.getItemsAvailable() 
                  - order.getProductCount());
            rsv.setItemsReserved(rsv.getItemsReserved() 
                  + order.getProductCount());
            order.setStatus("ACCEPT");
         } else {
            order.setStatus("REJECT");
         }
         // (1)
         template.send("stock-orders", order.getId(), order)
            .addCallback(r -> LOG.info("Sent: {}", 
               result != null ? result.getProducerRecord().value() : null),
               ex -> {});
      }
   }
   LOG.info("{}", rsv); // (2)
   return rsv;
};

After that, we are also logging the value of the Reservation object (1). In order to do that we need to convert KTable into KStream and then call the peek method. This log is printed just after Kafka Streams commits the offset in the source topic.

@Bean
public KStream<Long, Order> stream(StreamsBuilder builder) {
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   JsonSerde<Reservation> rsvSerde = new JsonSerde<>(Reservation.class);
   KStream<Long, Order> stream = builder
      .stream("orders", Consumed.with(Serdes.Long(), orderSerde))
      .peek((k, order) -> LOG.info("New: {}", order));

   KeyValueBytesStoreSupplier stockOrderStoreSupplier =
      Stores.persistentKeyValueStore("stock-orders");

   stream.selectKey((k, v) -> v.getProductId())
      .groupByKey(Grouped.with(Serdes.Long(), orderSerde))
      .aggregate(() -> new Reservation(random.nextInt(100)), aggrSrv,
         Materialized.<Long, Reservation>as(stockOrderStoreSupplier)
            .withKeySerde(Serdes.Long())
            .withValueSerde(rsvSerde))
      .toStream()
      .peek((k, trx) -> LOG.info("Commit: {}", trx)); // (1)

   return stream;
}

What will happen if you send the test order? Let’s see the logs. You can see the difference in time between processing the message and offset commit. You won’t have any problems with that until your application is running or it has been stopped gracefully. But if you, for example, kill the process using the kill -9 command? After restart, our application will receive the same messages once again. Since we use KafkaTemplate to send the response to the stock-orders topic, we need to commit the offset as soon as possible.

What can we do to avoid such problems? We may override the default value (30000) of the commit.interval.ms Kafka Streams property. If you set it to 0, it commits immediately after processing finishes.

spring.kafka:  
  streams:
    properties:
      commit.interval.ms: 0

On the other hand, we can also set the property processing.guarantee to exactly_once. It also changes the default value of commit.interval.ms to 100ms and enables idempotence for a producer. You can read more about it here in Kafka documentation.

spring.kafka:  
  streams:
    properties:
      processing.guarantee: exactly_once

Running Kafka on Upstash

For the purpose of today’s exercise, we will use a serverless Kafka cluster on Upstash. You can create it with a single click. If you would like to test JAAS authentication for your application I’ve got good news 🙂 The authentication on that cluster is enabled by default. You can find and copy username and password from the cluster’s main panel.

kafka-streams-transactions-upstash

Now, let’s configure Kafka connection settings and credentials for the Spring Boot application. There is a developer free tier on Upstash up to 10k messages per day. It will be enough for our tests.

spring.kafka:
  bootstrap-servers: topical-gar-11460-us1-kafka.upstash.io:9092
  properties:
    security.protocol: SASL_SSL
    sasl.mechanism: SCRAM-SHA-256
    sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${USERNAME}" password="${PASSWORD}";

With Upstash you can easily display a list of topics. In total, there are 10 topics used in our sample system. Three of them are used directly by the Spring Boot applications, while the rest of them by the Kafka Streams in order to process stateful operations.

After starting the order-service application we can call its REST endpoint to create and send an order to the Kafka topic.

private static final Logger LOG = 
   LoggerFactory.getLogger(OrderController.class);
private AtomicLong id = new AtomicLong();
private KafkaTemplate<Long, Order> template;

@PostMapping
public Order create(@RequestBody Order order) {
   order.setId(id.incrementAndGet());
   template.send("orders", order.getId(), order);
   LOG.info("Sent: {}", order);
   return order;
}

Let’s call the endpoint using the following curl command. You can use any customerId or productId you want.

$ curl -X 'POST' \
  'http://localhost:8080/orders' \
  -H 'Content-Type: application/json' \
  -d '{
    "customerId": 20,
    "productId": 20,
    "productCount": 2,
    "price": 10,
    "status": "NEW"
  }'

All three sample applications use Kafka Streams to process distributed transactions. Once the order is accepted by both stock-service and payment-service you should see the following entry in the order-service logs.

You can easily simulate rejection of transactions with Kafka Streams just by setting e.g. productCount higher than the value generated by the product-service as available items.

With Upstash UI you can also easily verify the number of messages incoming to the topics. Let’s see the current statistics for the orders topic.

Final Thoughts

In order to fully understand what happens in this example, you should be also familiar with the Kafka Streams threading model. It is worth reading the following article, which explains it in a clean manner. First of all, each stream partition is a totally ordered sequence of data records and maps to a Kafka topic partition. It means, that even if we have multiple orders at the same time related to e.g. same product, they are all processed sequentially since they have the same message key (productId in that case).

Moreover, by default, there is only a single stream thread that handles all the partitions. You can see this in the logs below. However, there are stream tasks that act as the lowest-level units of parallelism. As a result, stream tasks can be processed independently and in parallel without manual intervention.

I hope this article helps you to better understand Kafka Streams. I just wanted to give you a simple example of how you can use Kafka Streams with Saga transactions in order to simplify your current architecture.

The post Deep Dive into Saga Transactions with Kafka Streams and Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/02/07/deep-dive-into-saga-transactions-with-kafka-streams-and-spring-boot/feed/ 8 10587
Distributed Transactions in Microservices with Kafka Streams and Spring Boot https://piotrminkowski.com/2022/01/24/distributed-transactions-in-microservices-with-kafka-streams-and-spring-boot/ https://piotrminkowski.com/2022/01/24/distributed-transactions-in-microservices-with-kafka-streams-and-spring-boot/#comments Mon, 24 Jan 2022 11:11:24 +0000 https://piotrminkowski.com/?p=10501 In this article, you will learn how to use Kafka Streams with Spring Boot. We will rely on the Spring Kafka project. In order to explain well how it works, we are going to implement a saga pattern. The saga pattern is a way to manage distributed transactions across microservices. The key phase of that […]

The post Distributed Transactions in Microservices with Kafka Streams and Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams with Spring Boot. We will rely on the Spring Kafka project. In order to explain well how it works, we are going to implement a saga pattern. The saga pattern is a way to manage distributed transactions across microservices. The key phase of that process is to publish an event that triggers local transactions. Microservices exchanges such events through a message broker. It turns out that Kafka Streams may help us here. Let’s see how!

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions.

Instead of Spring Kafka, you could as well use Spring Cloud Stream for Kafka. You can read more about it in this article. Spring Cloud Stream provides several useful features like DLQ support, serialization to JSON by default, or interactive queries.

Architecture

We will create a simple system that consists of three microservices. The order-service sends orders to the Kafka topic called orders. Both other microservices stock-service and payment-service listen for the incoming events. After receiving them they verify if it is possible to execute the order. For example, if there are no sufficient funds on the customer account the order is rejected. Otherwise, the payment-service accepts the order and sends a response to the payment-orders topic. The same with stock-service except that it verifies a number of products in stock and sends a response to the stock-orders topic.

Then, the order-service joins two streams from the stock-orders and payment-orders topics by the order’s id. If both orders were accepted it confirms a distributed transaction. On the other hand, if one order has been accepted and the second rejected it performs rollback. In that case, it just generates o new order event and sends it to the orders topic. We may treat the orders topic as a stream of the order’s status changes or just like a table with the last status. Here’s the picture that visualizes our scenario.

kafka-streams-spring-boot-arch

Kafka Streams with Spring Boot

Let’s begin our implementation from the order-service. Surprisingly there is no Spring Boot starter for Kafka (unless we use Spring Cloud Stream). Therefore we need to include the spring-kafka dependency. In order to process streams we also need to include the kafka-streams module directly. Since the order-service exposes some REST endpoints it is required to add the Spring Boot Web starter.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
</dependency>

The order-service is the most important microservice in our scenario. It acts as an order gateway and a saga pattern orchestrator. It requires all the three topics used in our architecture. In order to automatically create topics on application startup we need to define the following beans:

@Bean
public NewTopic orders() {
   return TopicBuilder.name("orders")
         .partitions(3)
         .compact()
         .build();
}

@Bean
public NewTopic paymentTopic() {
   return TopicBuilder.name("payment-orders")
         .partitions(3)
         .compact()
         .build();
}

@Bean
public NewTopic stockTopic() {
   return TopicBuilder.name("stock-orders")
         .partitions(3)
         .compact()
         .build();
}

Then, let’s define our first Kafka stream. To do that we need to use the StreamsBuilder bean. The order-service receives events from the payment-service (in the payment-events topic) and from the stock-service (in the stock-events topic). Every single event contains the id previously set by the order-service. If we join both these streams into a single stream by order’s id we will be able to determine the status of our transaction. The algorithm is pretty simple. If both payment-service and stock-service accepted the order the final status of transaction is CONFIRMED. If both services rejected the order the final status is REJECTED. The last option is ROLLBACK – when one service accepted the order, and one service rejected it. Here’s the described method inside the OrderManageService bean.

@Service
public class OrderManageService {

   public Order confirm(Order orderPayment, Order orderStock) {
      Order o = new Order(orderPayment.getId(),
             orderPayment.getCustomerId(),
             orderPayment.getProductId(),
             orderPayment.getProductCount(),
             orderPayment.getPrice());
      if (orderPayment.getStatus().equals("ACCEPT") &&
                orderStock.getStatus().equals("ACCEPT")) {
         o.setStatus("CONFIRMED");
      } else if (orderPayment.getStatus().equals("REJECT") &&
                orderStock.getStatus().equals("REJECT")) {
         o.setStatus("REJECTED");
      } else if (orderPayment.getStatus().equals("REJECT") ||
                orderStock.getStatus().equals("REJECT")) {
         String source = orderPayment.getStatus().equals("REJECT")
                    ? "PAYMENT" : "STOCK";
         o.setStatus("ROLLBACK");
         o.setSource(source);
      }
      return o;
   }

}

Finally, the implementation of our stream. We need to define the KStream bean. We are joining two streams using the join method of KStream. The joining window is 10 seconds. As the result, we are setting the status of the order and sending a new order to the orders topic. We use the same topic as for sending new orders.

@Autowired
OrderManageService orderManageService;

@Bean
public KStream<Long, Order> stream(StreamsBuilder builder) {
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   KStream<Long, Order> stream = builder
         .stream("payment-orders", Consumed.with(Serdes.Long(), orderSerde));

   stream.join(
         builder.stream("stock-orders"),
         orderManageService::confirm,
         JoinWindows.of(Duration.ofSeconds(10)),
         StreamJoined.with(Serdes.Long(), orderSerde, orderSerde))
      .peek((k, o) -> LOG.info("Output: {}", o))
      .to("orders");

   return stream;
}

Let’s see it also in the picture.

kafka-streams-spring-boot-join

Configuration for Spring Boot

In Spring Boot the name of the application is by default the name of the consumer group for Kafka Streams. Therefore, we should set in application.yml. Of course, we also need to set the address of the Kafka bootstrap server. Finally, we have to configure the default key and value for events serialization. It applies to both standard and streams processing.

spring.application.name: orders
spring.kafka:
  bootstrap-servers: 127.0.0.1:56820
  producer:
    key-serializer: org.apache.kafka.common.serialization.LongSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  streams:
    properties:
      default.key.serde: org.apache.kafka.common.serialization.Serdes$LongSerde
      default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
      spring.json.trusted.packages: "*"

Sending and receiving events from Kafka topics

In the previous section, we discussed how to create a new Kafka stream as a result of joining two other streams. Now, let’s see how to process incoming messages. We can consider it on the example of the payment-service. It listens for the incoming orders. If it receives a new order it performs reservation on the customer’s account and sends a response with a reservation status to the payment-orders topic. If it receives confirmation of the transaction from the order-service, it commits the transaction or rollbacks it. In order to enable Kafka listener, we should annotate the main class with @EnableKafka. Additionally, the listening method must be annotated with the @KafkaListener. The following method listens for events on the orders topic and runs in the payment consumer group.

@SpringBootApplication
@EnableKafka
public class PaymentApp {

    private static final Logger LOG = LoggerFactory.getLogger(PaymentApp.class);

    public static void main(String[] args) {
        SpringApplication.run(PaymentApp.class, args);
    }

    @Autowired
    OrderManageService orderManageService;

    @KafkaListener(id = "orders", topics = "orders", groupId = "payment")
    public void onEvent(Order o) {
        LOG.info("Received: {}" , o);
        if (o.getStatus().equals("NEW"))
            orderManageService.reserve(o);
        else
            orderManageService.confirm(o);
    }
}

Here’s the implementation of the OrderManageService used in the previous code snippet. If it receives the order in the NEW status it performs reservation. During the reservation, it subtracts the order’s price from the amountAvailable field and adds the same value to the amountReserved field. Then it sets the status of the order and sends a response to the payment-orders topic using KafkaTemplate. During the confirmation phase, it doesn’t send any response event. It can perform a rollback, which means – subtracting the order’s price from the amountReserved field and adding it to the amountAvailable field. Otherwise it just “commits” the transaction by subtracting the price from the amountReserved field.

@Service
public class OrderManageService {

   private static final String SOURCE = "payment";
   private static final Logger LOG = LoggerFactory.getLogger(OrderManageService.class);
   private CustomerRepository repository;
   private KafkaTemplate<Long, Order> template;

   public OrderManageService(CustomerRepository repository, KafkaTemplate<Long, Order> template) {
      this.repository = repository;
      this.template = template;
   }

   public void reserve(Order order) {
      Customer customer = repository.findById(order.getCustomerId()).orElseThrow();
      LOG.info("Found: {}", customer);
      if (order.getPrice() < customer.getAmountAvailable()) {
         order.setStatus("ACCEPT");
         customer.setAmountReserved(customer.getAmountReserved() + order.getPrice());
         customer.setAmountAvailable(customer.getAmountAvailable() - order.getPrice());
      } else {
         order.setStatus("REJECT");
      }
      order.setSource(SOURCE);
      repository.save(customer);
      template.send("payment-orders", order.getId(), order);
      LOG.info("Sent: {}", order);
   }

   public void confirm(Order order) {
      Customer customer = repository.findById(order.getCustomerId()).orElseThrow();
      LOG.info("Found: {}", customer);
      if (order.getStatus().equals("CONFIRMED")) {
         customer.setAmountReserved(customer.getAmountReserved() - order.getPrice());
         repository.save(customer);
      } else if (order.getStatus().equals("ROLLBACK") && !order.getSource().equals(SOURCE)) {
         customer.setAmountReserved(customer.getAmountReserved() - order.getPrice());
         customer.setAmountAvailable(customer.getAmountAvailable() + order.getPrice());
         repository.save(customer);
      }

   }
}

A similar logic is implemented on the stock-service side. However, instead of the order’s price, it uses the field productCount and performs reservation for the desired number of ordered products. Here’s the implementation of the OrderManageService class in the stock-service.

@Service
public class OrderManageService {

   private static final String SOURCE = "stock";
   private static final Logger LOG = LoggerFactory.getLogger(OrderManageService.class);
   private ProductRepository repository;
   private KafkaTemplate<Long, Order> template;

   public OrderManageService(ProductRepository repository, KafkaTemplate<Long, Order> template) {
      this.repository = repository;
      this.template = template;
   }

   public void reserve(Order order) {
      Product product = repository.findById(order.getProductId()).orElseThrow();
      LOG.info("Found: {}", product);
      if (order.getStatus().equals("NEW")) {
         if (order.getProductCount() < product.getAvailableItems()) {
            product.setReservedItems(product.getReservedItems() + order.getProductCount());
            product.setAvailableItems(product.getAvailableItems() - order.getProductCount());
            order.setStatus("ACCEPT");
            repository.save(product);
         } else {
            order.setStatus("REJECT");
         }
         template.send("stock-orders", order.getId(), order);
         LOG.info("Sent: {}", order);
      }
   }

   public void confirm(Order order) {
      Product product = repository.findById(order.getProductId()).orElseThrow();
      LOG.info("Found: {}", product);
      if (order.getStatus().equals("CONFIRMED")) {
         product.setReservedItems(product.getReservedItems() - order.getProductCount());
         repository.save(product);
      } else if (order.getStatus().equals("ROLLBACK") && !order.getSource().equals(SOURCE)) {
         product.setReservedItems(product.getReservedItems() - order.getProductCount());
         product.setAvailableItems(product.getAvailableItems() + order.getProductCount());
         repository.save(product);
      }
   }

}

Query Kafka Stream with Spring Boot

Now, let’s consider the following scenario. Firstly, the order-service receives a new order (via REST API) and sends it to the Kafka topic. This order is then received by both other microservices. Once they send back a positive response (or negative) the order-service process them as streams and change the status of the order. The order with a new status is emitted to the same topic as before. So, where we are storing the data with the current status of an order? In Kafka topic. Once again we will use Kafka Streams in our Spring Boot application. But this time, we take advantage of KTable. Let’s at the visualization of our scenario.

kafka-streams-spring-boot-ktable

Ok, so let’s define another Kafka Streams bean in the order-service. We are getting the same orders topic as a stream. We will convert it to the Kafka table and materialize it in a persistent store. Thanks to that we will be able to easily query the store from our REST controller.

@Bean
public KTable<Long, Order> table(StreamsBuilder builder) {
   KeyValueBytesStoreSupplier store =
         Stores.persistentKeyValueStore("orders");
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   KStream<Long, Order> stream = builder
         .stream("orders", Consumed.with(Serdes.Long(), orderSerde));
   return stream.toTable(Materialized.<Long, Order>as(store)
         .withKeySerde(Serdes.Long())
         .withValueSerde(orderSerde));
}

If we run more than one instance of the order-service on the same machine it is also important to override the default location of the state store. To do that we should define the following property unique per every instance:

spring.kafka.streams.state-dir: /tmp/kafka-streams/1

Unfortunately, there is no built-in support in Spring Boot for interactive queries of Kafka Streams. However, we can use auto-configured StreamsBuilderFactoryBean to inject KafkaStreams instance into the controller. Then we can query the state store under the “materialized” name. That’s of course very trivial sample. We are just getting all orders from KTable.

@GetMapping
public List<Order> all() {
   List<Order> orders = new ArrayList<>();
      ReadOnlyKeyValueStore<Long, Order> store = kafkaStreamsFactory
         .getKafkaStreams()
         .store(StoreQueryParameters.fromNameAndType(
                "orders",
                QueryableStoreTypes.keyValueStore()));
   KeyValueIterator<Long, Order> it = store.all();
   it.forEachRemaining(kv -> orders.add(kv.value));
   return orders;
}

In the same OrderController there is also a method for sending a new order to the Kafka topic.

@PostMapping
public Order create(@RequestBody Order order) {
   order.setId(id.incrementAndGet());
   template.send("orders", order.getId(), order);
   LOG.info("Sent: {}", order);
   return order;
}

Testing Scenario

Before we run our sample microservices, we need to start the local instance of Kafka. Usually, I’m using Redpanda for that. It is a Kafka API compatible streaming platform. In comparison to Kafka, it is relatively easy to run it locally. All you need to do is to install the rpk CLI locally (here is the instruction for macOS). After that, you can create a single-node instance using the following command:

$ rpk container start

After running, it will print the address of your node. For me, it is 127.0.0.1:56820. You should put that address as a value of the spring.kafka.bootstrap-servers property. You can also display a list of created topics using the following command:

$ rpk topic list --brokers 127.0.0.1:56820

Then, let’s run our microservices. Begin from the order-service since it is creating all the required topics and building the Kafka Streams instance. You can send a single order using the REST endpoint:

$ curl -X 'POST' \
  'http://localhost:8080/orders' \
  -H 'Content-Type: application/json' \
  -d '{
  "customerId": 10,
  "productId": 10,
  "productCount": 5,
  "price": 100,
  "status": "NEW"
}'

There is some test data inserted while payment-service and stock-service start. So, you can set the value of customerId or productId between 1 and 100 and it will work for you. However, you can use as well a method for generating a random stream of data. The following bean is responsible for generating 10000 random orders:

@Service
public class OrderGeneratorService {

   private static Random RAND = new Random();
   private AtomicLong id = new AtomicLong();
   private Executor executor;
   private KafkaTemplate<Long, Order> template;

   public OrderGeneratorService(Executor executor, KafkaTemplate<Long, Order> template) {
      this.executor = executor;
      this.template = template;
   }

   @Async
   public void generate() {
      for (int i = 0; i < 10000; i++) {
         int x = RAND.nextInt(5) + 1;
         Order o = new Order(id.incrementAndGet(), RAND.nextLong(100) + 1, RAND.nextLong(100) + 1, "NEW");
         o.setPrice(100 * x);
         o.setProductCount(x);
         template.send("orders", o.getId(), o);
      }
   }
}

You can start that process by calling the endpoint POST /orders/generate.

@PostMapping("/generate")
public boolean create() {
   orderGeneratorService.generate();
   return true;
}

No matter if decide to send single order or generate multiple random orders you can easily query the status of orders using the following endpoint:

$ curl http://localhost:8080/orders

Here’s a structure of topics generated by the application and by Kafka Streams to perform join operation and save the orders KTable as a state store.

The post Distributed Transactions in Microservices with Kafka Streams and Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/01/24/distributed-transactions-in-microservices-with-kafka-streams-and-spring-boot/feed/ 26 10501