Message Brokers Archives - Piotr's TechBlog https://piotrminkowski.com/category/message-brokers/ Java, Spring, Kotlin, microservices, Kubernetes, containers Mon, 11 Mar 2024 13:10:02 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 Message Brokers Archives - Piotr's TechBlog https://piotrminkowski.com/category/message-brokers/ 32 32 181738725 Kafka Offset with Spring Boot https://piotrminkowski.com/2024/03/11/kafka-offset-with-spring-boot/ https://piotrminkowski.com/2024/03/11/kafka-offset-with-spring-boot/#comments Mon, 11 Mar 2024 13:09:59 +0000 https://piotrminkowski.com/?p=15064 In this article, you will learn how to manage Kafka consumer offset with Spring Boot and Spring Kafka. An inspiration for preparing this article was the feedback I received after publishing the post describing concurrency with Kafka and Spring Boot. You were asking me questions related not only to concurrency but also to the consumer […]

The post Kafka Offset with Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to manage Kafka consumer offset with Spring Boot and Spring Kafka. An inspiration for preparing this article was the feedback I received after publishing the post describing concurrency with Kafka and Spring Boot. You were asking me questions related not only to concurrency but also to the consumer offset committing process. In the previous article, I focused mostly on showing that the way how the app handles Kafka messages may impact the overall performance of our system. I didn’t consider things like message duplicates or losing messages on the consumer side. Today, we are going to discuss exactly those topics.

If you are interested in Kafka and Spring Boot you can find several articles about it on my blog. Except for the already mentioned post about concurrency, you can read e.g. about Kafka transactions here. On the other hand, to read about microservices with Kafka and Spring Boot refer to the following article.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Introduction

Before we start, we need to clarify some important things related to committing offsets with Spring Kafka. First of all, by default, Spring Kafka sets the consumer enable.auto.commit property to false. It means that the framework, not Kafka, is responsible for committing an offset. Of course, we can change the default behavior by setting that property to true. By the way, it was the default approach before Spring Kafka 2.3. 

Once we stay with Kafka auto committing disabled, we can leverage 7 different commit strategies provided by Spring Kafka. Today, we won’t analyze all of them, but just the most significant. The default strategy is BATCH. In order to set the different strategy, we need to override the AckMode e.g. by setting a value of the property spring.kafka.listener.ack-mode in Spring Boot application properties. However, firstly, let’s focus on the BATCH mode.

Sample Spring Boot Kafka Apps

In order to test the offset committing with Spring Kafka, we will create two simple apps: producer and consumer. Producer sends a defined number of messages to the topic, while the consumer receives and processes them. Here’s the producer @RestController implementation. It allows us to send a defined number of messages to the transactions topic on demand:

@RestController
public class TransactionsController {

   private static final Logger LOG = LoggerFactory
            .getLogger(TransactionsController.class);

   long id = 1;
   long groupId = 1;
   KafkaTemplate<Long, Order> kafkaTemplate;

   @PostMapping("/transactions")
   public void generateAndSendMessages(@RequestBody InputParameters inputParameters) {
      for (long i = 0; i < inputParameters.getNumberOfMessages(); i++) {
         Order o = new Order(id++, i+1, i+2, 1000, "NEW", groupId);
         CompletableFuture<SendResult<Long, Order>> result =
                 kafkaTemplate.send("transactions", o.getId(), o);
         result.whenComplete((sr, ex) ->
                 LOG.info("Sent({}): {}", sr.getProducerRecord().key(), sr.getProducerRecord().value()));
      }
      groupId++;
   }

}

Here are the producer app configuration properties. We need to set the address of a Kafka broker, and serializer classes for a key (Long) and a value (JSON format).

spring:
  application.name: producer
  kafka:
    bootstrap-servers: ${KAFKA_URL}
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

We can trigger the process of sending messages, by calling the POST /transactions endpoint as shown below:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d '{"numberOfMessages":10}'

Here’s the consumer app listener bean implementation. As you see, it is very simple. It just receives and prints the messages. We are sleeping the thread for 10 seconds, just to be able to easily check the offset on the Kafka topic during the test.

@Service
public class Listener {

   private static final Logger LOG = LoggerFactory
          .getLogger(NoTransactionsListener.class);

   @KafkaListener(
          id = "transactions",
          topics = "transactions",
          groupId = "a"
   )
   public void listen(@Payload Order order,
                      @Header(KafkaHeaders.OFFSET) Long offset,
                      @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) throws InterruptedException {
      LOG.info("[partition={},offset={}] Starting: {}", partition, offset, order);
      Thread.sleep(10000L);
      LOG.info("[partition={},offset={}] Finished: {}", partition, offset, order);
   }

}

In order to see what exactly happens in the consumer app, we need to increase the default logging level for Spring Kafka to DEBUG. There are also some other properties related to the serialization and deserialization of messages in the application properties. Here’s the whole application.yml file for the consumer app:

spring:
  application.name: consumer
  output.ansi.enabled: ALWAYS
  kafka:
    bootstrap-servers: ${KAFKA_URL:localhost}:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"

logging.level:
  org.springframework.kafka: debug

Understanding How Spring Kafka Commits the Offset

Then, let’s start both our apps by executing the Maven commands visible below. I assume that you already have the Kafka broker running. Once the consumer is connected to the transactions topic, we can send 10 messages by calling the already-mentioned POST /transactions endpoint. After that, we will switch the consumer app logs. We can see all significant information related to the offset committing.

# build the whole project
$ mvn clean package

# run the consumer app
$ cd consumer
$ mvn spring-boot:run

# run the producer app
$ cd producer
$ mvn spring-boot:run

So, here are the logs from our test. I highlighted the most important parts. Of course, your results may differ slightly, but the rules are the same. First of all, the consumer receives a batch of messages. In that case, there are 2 messages, but for example, it consumes 7 in one step in the next partition. Without detailed logs, you won’t even be aware of how it behaves, since the message listener processes a message after a message. However, the offset commit action is performed after processing all the consumed messages. That’s because we have the AckMode set to BATCH.

spring-kafka-offset-batch

Of course, it doesn’t have any impact on the app… as long as it is running. In case if not a graceful restart or crash occurs in the time between starting processing of batch messages and offset commit action it may cause some problems. Don’t get me wrong – it’s a standard situation that results in message duplicates on the consumer side. So, now our app consumes 7 messages in a batch. Let’s stop it during batch processing as shown below. By the way, with a graceful shutdown, Spring Kafka waits until the last message in the batch is processed. Therefore, I simulated an immediate stop with the SIGKILL option for the testing purposes.

spring-kafka-offset-batch-kill

The consumer offset has not been committed. We can verify it by checking out the current value of the consumer offset on the 1 partition. You can compare that value with the values highlighted in the logs above.

Then, let’s start our consumer app once again. The app starts reading messages from the latest committed offset for each partition. Consequently, it processes several messages already processed previously, before killing the consumer instance. As you see, the consumer app is processing orders with the 3, 5, 6, 8, and 10 id once again. We need to take such situations into account during the business logic implementation. After processing the last message in batch, Spring Kafka commits the offset.

spring-kafka-offset-batch-duplicates

Finally, everything works fine. There is no customer lag on any partition.

Using the RECORD Mode to Commit Offset

In the next step, we will compare a similar situation with the AckMode set to RECORD. According to the Spring Kafka docs the RECORD mode “commits the offset when the listener returns after processing the record”. In order to enable the RECORD mode, we need to set the following in the application.yml file:

spring.kafka.listener.ack-mode: RECORD

Then, we have to restart the consumer app. After that, we can trigger the process of sending messages once again, by calling the POST /transactions endpoint exposed by the producer app:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d '{\"numberOfMessages\":10}'

Let’s switch to the logs. As you see, each time after processing a single record by the @KafkaListener method Spring Kafka commits the offset. I guess that some of you assumed that this was the default behavior (not the BATCH mode) 🙂 That approach decreases the potential number of duplicate messages after the restart, but on the other hand, impacts the overall performance of the consumer.

spring-kafka-offset-record

The latest committed customer offset visible in the logs is 8. So, if we switch to the GUI client we can verify that the current offset there has the same value.

Graceful Shutdown

Although our app commits the offset each time after processing a single record, in the graceful shutdown Spring Boot waits until the whole batch is processed. As you see, I initiated the shutdown procedure at 15:12:41, but the container performed a shutdown after a 30-second timeout. That’s because I included 10 seconds of thread sleep in the processing method. It results in the total time of processing the batch of messages higher than 30 seconds.

spring-kafka-offset-batch-shutdown

However, we can change that behavior. We need to set the spring.kafka.listener.immediate-stop property to true. That property decides whether the container stops after the current record is processed or after all the records from the previous poll are processed.

spring.kafka.listener.immediate-stop: true

After restarting the consumer app we need to take a look at the logs once again. The Spring container starts a shutdown procedure just after committing the offset after processing the last record.

Spring Kafka Offset and Concurrency

Processing Messages with the Custom Thread Pool

Finally, the last scenario in our article. Let’s consider the case when we are using the custom thread to handle messages received by the @KafkaListener method. In order to do that, we can define the ExecutorService object. Once the listenAsync() method receives the message it delegates processing to the Processor bean by calling its process() method using the ExecutorService object.

@Service
public class Listener {

   private static final Logger LOG = LoggerFactory
          .getLogger(Listener.class);

   ExecutorService executorService = Executors.newFixedThreadPool(30);

   @Autowired
   private Processor processor;

   @KafkaListener(
          id = "transactions-async",
          topics = "transactions-async",
          groupId = "a"
   )
   public void listenAsync(@Payload Order order,
                     @Header(KafkaHeaders.OFFSET) Long offset,
                     @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
      LOG.info("[partition={},offset={}] Starting Async: {}", partition, offset, order);
      executorService.submit(() -> processor.process(order));
   }
}

In the Processor bean, we are sleeping the thread for 10 seconds for testing purposes. The process() method doesn’t do anything important, it just prints the log at the start and before finishing.

@Service
public class Processor {

   private static final Logger LOG = LoggerFactory
          .getLogger(Listener.class);

   public void process(Order order) {
      LOG.info("Processing: {}", order.getId());
      try {
         Thread.sleep(10000L);
      } catch (InterruptedException e) {
         throw new RuntimeException(e);
      }
      LOG.info("Finished: {}", order.getId());
   }

}

Let’s analyze what will happen after sending some message to such a consumer. This time we are using the transaction-async topic. By default, Spring Kafka commits the offset after processing the whole batch of 4 received messages. However, it happens almost immediately after receiving the messages, because we are delegating the further processing to another thread. The asynchronous method finishes processing after 10 seconds. If your app crashes during those 10 seconds, it will result in losing messages. They won’t be processed by the new instance of the app, because the offset has already been committed before message processing has been finished.

Enable Manual Offset Commit

Once again, it is a normal situation, that we can lose messages with the Kafka consumer. However, we can handle such cases slightly differently. Instead of relying on the container-managed offset commitment, we can switch to the manual mode. First of all, let’s add the following property to the Spring Boot application.yml file:

spring.kafka.listener.ack-mode: MANUAL_IMMEDIATE

Then we need to leverage the Acknowledgment interface to take a control over the offset commitment process inside the listener. As you see, we have to include such an interface to the @KafkaListener method parameters. After that, we can pass it to the process() method running in the different thread.

@Service
public class Listener {

   private static final Logger LOG = LoggerFactory
          .getLogger(Listener.class);

   ExecutorService executorService = Executors.newFixedThreadPool(30);

   @Autowired
   private Processor processor;

   @KafkaListener(
          id = "transactions-async",           
          topics = "transactions-async",
          groupId = "a"
   )
   public void listenAsync(@Payload Order order,
                     Acknowledgment acknowledgment,
                     @Header(KafkaHeaders.OFFSET) Long offset,
                     @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
      LOG.info("[partition={},offset={}] Starting Async: {}", partition, offset, order);
      executorService.submit(() -> processor.process(order, acknowledgment));
   }
}

With the acknowledge() provided by the Acknowledgment interface we can manually commit the offset in the selected location in the code. Here, we are making a commit at the of the whole method.

@Service
public class Processor {

   private static final Logger LOG = LoggerFactory
          .getLogger(Listener.class);

   public void process(Order order, Acknowledgment acknowledgment) {
      LOG.info("Processing: {}", order.getId());
      try {
         Thread.sleep(10000L);
      } catch (InterruptedException e) {
         throw new RuntimeException(e);
      }
      LOG.info("Finished: {}", order.getId());
      acknowledgment.acknowledge();
   }

}

Let’s switch to the consumer app logs once again. As you the offset commit happens almost immediately after processing the message. By the way, the MANUAL (instead of MANUAL_IMMEDIATE) AckMode will wait with the commit until the whole batch records will be processed. Another thing worth mentioning here is a possibility of out-of-order commit. It is disabled by default for the Spring Boot app. In order to enable it we need to set the spring.kafka.listener.async-acks property to true. If you want to test such a scenario by yourself you can increase the number of messages sent by the producer with the numberOfMessages field e.g. to 100. Then verify the consumer lag with or without the asyncAcks property.

Finally, let’s verify the current committed offset for all the partitions using the GUI client.

Final Thoughts

Kafka consumer offset is a very interesting topic. If you want to understand Kafka, you first need to understand how consumers commit the offset on partitions. In this article, I focused on showing you how to switch between different acknowledgment modes with Spring Kafka and how impacts on your app.

The post Kafka Offset with Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2024/03/11/kafka-offset-with-spring-boot/feed/ 4 15064
Kafka Tracing with Spring Boot and Open Telemetry https://piotrminkowski.com/2023/11/15/kafka-tracing-with-spring-boot-and-open-telemetry/ https://piotrminkowski.com/2023/11/15/kafka-tracing-with-spring-boot-and-open-telemetry/#comments Wed, 15 Nov 2023 11:26:33 +0000 https://piotrminkowski.com/?p=14669 In this article, you will learn how to configure tracing for Kafka producer and consumer with Spring Boot and Open Telemetry. We will use the Micrometer library for sending traces and Jaeger for storing and visualizing them. Spring Kafka comes with built-in integration with Micrometer for the KafkaTemplate and listener containers. You will also see […]

The post Kafka Tracing with Spring Boot and Open Telemetry appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to configure tracing for Kafka producer and consumer with Spring Boot and Open Telemetry. We will use the Micrometer library for sending traces and Jaeger for storing and visualizing them. Spring Kafka comes with built-in integration with Micrometer for the KafkaTemplate and listener containers. You will also see how to configure the Spring Kafka observability to add our custom tags to traces.

If you are interested in Kafka and Spring Boot, you may find several articles on my blog about it. To read about concurrency with Kafka and Spring Boot read the following post. For example, there is also an interesting article about Kafka transactions here.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should go to the kafka directory. After that, you should just follow my instructions. Let’s begin.

Dependencies

Let’s take a look at the list of required Maven dependencies. It is the same for both of our sample Spring Boot apps. Of course, we need to add the Spring Boot starter and the Spring Kafka for sending or receiving messages. In order to automatically generate traces related to each message, we are including the Spring Boot Actuator and the Micrometer Tracing Open Telemetry bridge. Finally, we need to include the opentelemetry-exporter-otlp library to export traces outside the app.

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
  </dependency>
  <dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-tracing-bridge-otel</artifactId>
  </dependency>
  <dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-exporter-otlp</artifactId>
  </dependency>
</dependencies>

Spring Boot Kafka Tracing for Producer

Our apps don’t do anything complicated. They are just sending and receiving messages. Here’s the class representing the message exchanged between both apps.

public class Info {

    private Long id;
    private String source;
    private String space;
    private String cluster;
    private String message;

    public Info(Long id, String source, String space, String cluster, 
                String message) {
       this.id = id;
       this.source = source;
       this.space = space;
       this.cluster = cluster;
       this.message = message;
    }

   // GETTERS AND SETTERS
}

Let’s begin with the producer app. It generates and sends one message per second. Here’s the implementation of a @Service bean responsible for producing messages. It injects and uses the KafkaTemplate bean for that.

@Service
public class SenderService {

   private static final Logger LOG = LoggerFactory
      .getLogger(SenderService.class);

   AtomicLong id = new AtomicLong();
   @Autowired
   KafkaTemplate<Long, Info> template;

   @Value("${POD:kafka-producer}")
   private String pod;
   @Value("${NAMESPACE:empty}")
   private String namespace;
   @Value("${CLUSTER:localhost}")
   private String cluster;
   @Value("${TOPIC:info}")
   private String topic;

   @Scheduled(fixedRate = 1000)
   public void send() {
      Info info = new Info(id.incrementAndGet(), 
                           pod, 
                           namespace, 
                           cluster, 
                           "HELLO");
      CompletableFuture<SendResult<Long, Info>> result = template
         .send(topic, info.getId(), info);
      result.whenComplete((sr, ex) ->
                LOG.info("Sent({}): {}", sr.getProducerRecord().key(), 
                         sr.getProducerRecord().value()));
    }

}

Spring Boot provides an auto-configured instance of KafkaTemplate. However, to enable Kafka tracing with Spring Boot we need to customize that instance. Here’s the implementation of the KafkaTemplate bean inside the producer app’s main class. In order to enable tracing, we need to invoke the setObservationEnabled method. By default, the Micrometer module generates some generic tags. We want to add at least the name of the target topic and the Kafka message key. Therefore we are creating our custom implementation of the KafkaTemplateObservationConvention interface. It uses the KafkaRecordSenderContext to retrieve the topic name and the message key from the ProducerRecord object.

@SpringBootApplication
@EnableScheduling
public class KafkaProducer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaProducer.class);

   public static void main(String[] args) {
      SpringApplication.run(KafkaProducer.class, args);
   }

   @Bean
   public NewTopic infoTopic() {
      return TopicBuilder.name("info")
             .partitions(1)
             .replicas(1)
             .build();
   }

   @Bean
   public KafkaTemplate<Long, Info> kafkaTemplate(ProducerFactory<Long, Info> producerFactory) {
      KafkaTemplate<Long, Info> t = new KafkaTemplate<>(producerFactory);
      t.setObservationEnabled(true);
      t.setObservationConvention(new KafkaTemplateObservationConvention() {
         @Override
         public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) {
            return KeyValues.of("topic", context.getDestination(),
                    "id", String.valueOf(context.getRecord().key()));
         }
      });
      return t;
   }

}

We also need to set the address of the Jaeger instance and decide which percentage of spans will be exported. Here’s the application.yml file with the required properties:

spring:
  application.name: kafka-producer
  kafka:
    bootstrap-servers: ${KAFKA_URL:localhost}:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

management:
  tracing:
    enabled: true
    sampling:
      probability: 1.0
  otlp:
    tracing:
      endpoint: http://jaeger:4318/v1/traces

Spring Boot Kafka Tracing for Consumer

Let’s switch to the consumer app. It just receives and prints messages coming to the Kafka topic. Here’s the implementation of the listener @Service. Besides the whole message content, it also prints the message key and a topic partition number.

@Service
public class ListenerService {

   private static final Logger LOG = LoggerFactory
      .getLogger(ListenerService.class);

   @KafkaListener(id = "info", topics = "${app.in.topic}")
   public void onMessage(@Payload Info info,
                         @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Long key,
                         @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
      LOG.info("Received(key={}, partition={}): {}", key, partition, info);
   }

}

In order to generate and export traces on the consumer side we need to override the ConcurrentKafkaListenerContainerFactory bean. For the container listener factory, we should obtain the ContainerProperties instance and then invoke the setObservationEnabled method. The same as before we can create a custom implementation of the KafkaTemplateObservationConvention interface to include the additional tags (optionally).

@SpringBootApplication
@EnableKafka
public class KafkaConsumer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaConsumer.class);

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumer.class, args);
    }

    @Value("${app.in.topic}")
    private String topic;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> listenerFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setObservationEnabled(true);
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

    @Bean
    public NewTopic infoTopic() {
        return TopicBuilder.name(topic)
                .partitions(10)
                .replicas(3)
                .build();
    }

}

Of course, we also need to set a Jaeger address in the application.yml file:

spring:
  application.name: kafka-consumer
  kafka:
    bootstrap-servers: ${KAFKA_URL:localhost}:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"

app.in.topic: ${TOPIC:info}

management:
  tracing:
    enabled: true
    sampling:
      probability: 1.0
  otlp:
    tracing:
      endpoint: http://jaeger:4318/v1/traces

Trying on Docker

Once we finish the implementation we can try out our solution. We will run both Kafka and Jaeger as Docker containers. Firstly, let’s build the project and container images for the producer and consumer apps. Spring Boot provides built-in tools for that. Therefore, we just need to execute the following command:

$ mvn clean package spring-boot:build-image

After that, we can define the docker-compose.yml file with a list of containers. It is possible to dynamically override Spring Boot properties using a style based on environment variables. Thanks to that, we can easily change the Kafka and Jaeger addresses for the containers. Here’s our docker-compose.yml:

version: "3.8"
services:
  broker:
    image: moeenz/docker-kafka-kraft:latest
    restart: always
    ports:
      - "9092:9092"
    environment:
      - KRAFT_CONTAINER_HOST_NAME=broker
  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686"
      - "4317:4317"
      - "4318:4318"
  producer:
    image: library/producer:1.0-SNAPSHOT
    links:
      - broker
      - jaeger
    environment:
      MANAGEMENT_OTLP_TRACING_ENDPOINT: http://jaeger:4318/v1/traces
      SPRING_KAFKA_BOOTSTRAP_SERVERS: broker:9092
  consumer:
    image: library/consumer:1.0-SNAPSHOT
    links:
      - broker
      - jaeger
    environment:
      MANAGEMENT_OTLP_TRACING_ENDPOINT: http://jaeger:4318/v1/traces
      SPRING_KAFKA_BOOTSTRAP_SERVERS: broker:9092

Let’s run all the defined containers with the following command:

$ docker compose up

Our apps are running and exchanging messages:

The Jaeger dashboard is available under the 16686 port. As you see, there are several traces with the kafka-producer and kafka-consumer spans.

spring-boot-kafka-tracing-jaeger

We can go into the details of each entry. The trace generated by the producer app is always correlated to the trace generated by the consumer app for every single message. There are also our two custom tags (id and topic) with values added by the KafkaTemplate bean.

spring-boot-kafka-tracing-details

Running on Kubernetes

Our sample apps are prepared for being deployed on Kubernetes. You can easily do it with the Skaffold CLI. Before that, we need to install Kafka and Jaeger on Kubernetes. I will not get into details about Kafka installation. You can find a detailed description of how to run Kafka on Kubernetes with the Strimzi operator in my article available here. After that, we can proceed to the Jaeger installation. In the first step, we need to add the following Helm repository:

$ helm repo add jaegertracing https://jaegertracing.github.io/helm-charts

By default, the Jaeger Helm chart doesn’t expose OTLP endpoints. In order to enable them, we need to override some default settings. Here’s our values YAML manifest:

collector:
  service:
    otlp:
      grpc:
        name: otlp-grpc
        port: 4317
      http:
        name: otlp-http
        port: 4318

Let’s install Jaeger in the jaeger namespace with the parameters from jaeger-values.yaml:

$ helm install jaeger jaegertracing/jaeger -n jaeger \
    --create-namespace \
    -f jaeger-values.yaml

Once we install Jaeger we can verify a list of Kubernetes Services. We will use the jaeger-collector service to send traces for the apps and the jaeger-query service to access the UI dashboard.

$ kubectl get svc -n jaeger
NAME               TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                                           AGE
jaeger-agent       ClusterIP   10.96.147.104   <none>        5775/UDP,6831/UDP,6832/UDP,5778/TCP,14271/TCP     14m
jaeger-cassandra   ClusterIP   None            <none>        7000/TCP,7001/TCP,7199/TCP,9042/TCP,9160/TCP      14m
jaeger-collector   ClusterIP   10.96.111.236   <none>        14250/TCP,14268/TCP,4317/TCP,4318/TCP,14269/TCP   14m
jaeger-query       ClusterIP   10.96.88.64     <none>        80/TCP,16685/TCP,16687/TCP                        14m

Finally, we can run our sample Spring Boot apps that connect to Kafka and Jaeger. Here’s the Deployment object for the producer app. It overrides the default Kafka and Jaeger addresses by defining the KAFKA_URL and MANAGEMENT_OTLP_TRACING_ENDPOINT environment variables.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
      - name: producer
        image: piomin/producer
        resources:
          requests:
            memory: 200Mi
            cpu: 100m
        ports:
        - containerPort: 8080
        env:
          - name: MANAGEMENT_OTLP_TRACING_ENDPOINT
            value: http://jaeger-collector.jaeger:4318/v1/traces
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap
          - name: CLUSTER
            value: c1
          - name: TOPIC
            value: test-1
          - name: POD
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
          - name: NAMESPACE
            valueFrom:
              fieldRef:
                fieldPath: metadata.namespace

Here’s a similar Deployment object for the consumer app:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer-1
spec:
  selector:
    matchLabels:
      app: consumer-1
  template:
    metadata:
      labels:
        app: consumer-1
    spec:
      containers:
      - name: consumer
        image: piomin/consumer
        resources:
          requests:
            memory: 200Mi
            cpu: 100m
        ports:
        - containerPort: 8080
        env:
          - name: SPRING_APPLICATION_NAME
            value: kafka-consumer-1
          - name: TOPIC
            value: test-1
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap
          - name: MANAGEMENT_OTLP_TRACING_ENDPOINT
            value: http://jaeger-collector.jaeger:4318/v1/traces

Assuming that you are inside the kafka directory in the Git repository, you just need to run the following command to deploy both apps. By the way, I’ll create two deployments of the consumer app (consumer-1 and consumer-2) just for Jaeger visualization purposes.

$ skaffold run -n strimzi --tail

Once you run the apps, you can go to the Jaeger dashboard and verify the list of traces. In order to access the dashboard, we can enable port forwarding for the jaeger-query Service.

$ kubectl port-forward svc/jaeger-query 80:80

Final Thoughts

Integration between Spring Kafka and Micrometer Tracing is a relatively new feature available since the 3.0 version. It is possible, that it will be improved soon with some new features. Anyway, currently it gives a simple way to generate and send traces from Kafka producers and consumers.

The post Kafka Tracing with Spring Boot and Open Telemetry appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/11/15/kafka-tracing-with-spring-boot-and-open-telemetry/feed/ 11 14669
Apache Kafka on Kubernetes with Strimzi https://piotrminkowski.com/2023/11/06/apache-kafka-on-kubernetes-with-strimzi/ https://piotrminkowski.com/2023/11/06/apache-kafka-on-kubernetes-with-strimzi/#comments Mon, 06 Nov 2023 08:49:30 +0000 https://piotrminkowski.com/?p=14613 In this article, you will learn how to install and manage Apache Kafka on Kubernetes with Strimzi. The Strimzi operator lets us declaratively define and configure Kafka clusters, and several other components like Kafka Connect, Mirror Maker, or Cruise Control. Of course, it’s not the only way to install Kafka on Kubernetes. As an alternative, […]

The post Apache Kafka on Kubernetes with Strimzi appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to install and manage Apache Kafka on Kubernetes with Strimzi. The Strimzi operator lets us declaratively define and configure Kafka clusters, and several other components like Kafka Connect, Mirror Maker, or Cruise Control. Of course, it’s not the only way to install Kafka on Kubernetes. As an alternative, we can use the Bitnami Helm chart available here. In comparison to that approach, Strimzi simplifies the creation of additional components. We will analyze it on the example of the Cruise Control tool.

You can find many other articles about Apache Kafka on my blog. For example, to read about concurrency with Spring Kafka please refer to the following post. There is also an article about Kafka transactions available here.

Prerequisites

In order to proceed with the exercise, you need to have a Kubernetes cluster. This cluster should have at least three worker nodes since I’m going to show you the approach with Kafka brokers spread across several nodes. We can easily simulate multiple Kubernetes nodes locally with Kind. You need to install the kind CLI tool and start Docker on your laptop. Here’s the Kind configuration manifest containing a definition of a single control plane and 4 worker nodes:

kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker
- role: worker

Then, we need to create the Kubernetes cluster based on the manifest visible above with the following kind command:

$ kind create cluster --name c1 --config cluster.yaml

The name of our Kind cluster is c1. It corresponds to the kind-c1 Kubernetes context, which is automatically set as default after creating the cluster. After that, we can display a list of Kubernetes nodes using the following kubectl command:

$ kubectl get node
NAME               STATUS   ROLES           AGE  VERSION
c1-control-plane   Ready    control-plane   1m   v1.27.3
c1-worker          Ready    <none>          1m   v1.27.3
c1-worker2         Ready    <none>          1m   v1.27.3
c1-worker3         Ready    <none>          1m   v1.27.3
c1-worker4         Ready    <none>          1m   v1.27.3

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, go to the kafka directory. There are two Spring Boot apps inside the producer and consumer directories. The required Kubernetes manifests are available inside the k8s directory. You can apply them with kubectl or using the Skaffold CLI tool. The repository is already configured to work with Skaffold and Kind. To proceed with the exercise just follow my instructions in the next sections.

Architecture

Let’s analyze our main goals in this exercise. Of course, we want to run a Kafka cluster on Kubernetes as simple as possible. There are several requirements for the cluster:

  1. It should automatically expose broker metrics in the Prometheus format. Then we will use Prometheus mechanisms to get the metrics and store them for visualization.
  2. It should consist of at least 3 brokers. Each broker has to run on a different Kubernetes worker node.
  3. Our Kafka needs to work in the Zookeeper-less mode. Therefore, we need to enable the KRaft protocol between the brokers.
  4. Once we scale up the Kafka cluster, we must automatically rebalance it to reassign partition replicas to the new broker. In order to do that, we will use the Cruise Control support in Strimzi.

Here’s the diagram that visualizes the described architecture. We will also run two simple Spring Boot apps on Kubernetes that connect the Kafka cluster and use it to send/receive messages.

kafka-on-kubernetes-arch

1. Install Monitoring Stack on Kubernetes

In the first step, we will install the monitoring on our Kubernetes cluster. We are going to use the kube-prometheus-stack Helm chart for that. It provides preconfigured instances of Prometheus and Grafana. It also comes with several CRD objects that allow us to easily customize monitoring mechanisms according to our needs. Let’s add the following Helm repository:

$ helm repo add prometheus-community \
    https://prometheus-community.github.io/helm-charts

Then, we can install the chart in the monitoring namespace. We can leave the default configuration.

$ helm install kube-prometheus-stack \
    prometheus-community/kube-prometheus-stack \
    --version 52.1.0 -n monitoring --create-namespace

2. Install Strimzi Operator on Kubernetes

In the next step, we will install the Strimzi operator on Kubernetes using Helm chart. The same as before we need to add the Helm repository:

$ helm repo add strimzi https://strimzi.io/charts

Then, we can proceed to the installation. This time we will override some configuration settings. The Strimzi Helm chart comes with a set of Grafana dashboards to visualize metrics exported by Kafka brokers and some other components managed by Strimzi. We place those dashboards inside the monitoring namespace. By default, the Strimzi chart doesn’t add the dashboards, so we also need to enable that feature in the values YAML file. That’s not all. Because we want to run Kafka in the KRaft mode, we need to enable it using feature gates. Enabling the UseKRaft feature gate requires the KafkaNodePools feature gate to be enabled as well. Then when we deploy a Kafka cluster in KRaft mode, we also must use the KafkaNodePool resources. Here’s the full list of overridden Helm chart values:

dashboards:
  enabled: true
  namespace: monitoring
featureGates: +UseKRaft,+KafkaNodePools,+UnidirectionalTopicOperator

Finally, let’s install the operator in the strimzi namespace using the following command:

$ helm install strimzi-kafka-operator strimzi/strimzi-kafka-operator \
    --version 0.38.0 \
    -n strimzi --create-namespace \
    -f strimzi-values.yaml

3. Run Kafka in the KRaft Mode

In the current version of Strimzi KRaft mode support is still in the alpha phase. This will probably change soon but for now, we have to deal with some inconveniences. In the previous section, we enabled three feature gates required to run Kafka in KRaft mode. Thanks to that we can finally define our Kafka cluster. In the first step, we need to create a node pool. This new Strimzi object is responsible for configuring brokers and controllers in the cluster. Controllers are responsible for coordinating operations and maintaining the cluster’s state. Fortunately, a single node in the poll can act as a controller and a broker at the same time.

Let’s create the KafkaNodePool object for our cluster. As you see it defines two roles: broker and controller (1). We can also configure storage for the cluster members (2). One of our goals is to avoid sharing the same Kubernetes node between Kafka brokers. Therefore, we will define the podAntiAffinity section (3). Setting the topologyKey to kubernetes.io/hostname indicates that the selected pods are not scheduled on nodes with the same hostname (4).

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: dual-role
  namespace: strimzi
  labels:
    strimzi.io/cluster: my-cluster
spec:
  replicas: 3
  roles: # (1)
    - controller
    - broker
  storage: # (2)
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 20Gi
        deleteClaim: false
  template:
    pod:
      affinity:
        podAntiAffinity: # (3)
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: strimzi.io/name
                    operator: In
                    values:
                      - my-cluster-kafka
              topologyKey: "kubernetes.io/hostname" # (4)

Once we create a node pool, we can proceed to the Kafka object creation. We need to enable Kraft mode and node pools for the particular cluster by annotating it with strimzi.io/kraft and strimzi.io/node-pools (1). The sections like storage (2) or zookeeper (5) are not used in the KRaft mode but are still required by the CRD. We should also configure the cluster metrics exporter (3) and enable the Cruise Control component (4). Of course, our cluster is exposing API for the client connection under the 9092 port.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: strimzi
  annotations: # (1)
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: '3.6'
    storage: # (2)
      type: persistent-claim
      size: 5Gi
      deleteClaim: true
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    version: 3.6.0
    replicas: 3
    metricsConfig: # (3)
      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef:
          name: kafka-metrics
          key: kafka-metrics-config.yml
  entityOperator:
    topicOperator: {}
    userOperator: {}
  cruiseControl: {} # (4)
  # (5)
  zookeeper:
    storage:
      type: persistent-claim
      deleteClaim: true
      size: 2Gi
    replicas: 3

The metricsConfig section in the Kafka object took the ConfigMap as the configuration source. This ConfigMap contains a single kafka-metrics-config.yml entry with the Prometheus rules definition.

kind: ConfigMap
apiVersion: v1
metadata:
  name: kafka-metrics
  namespace: strimzi
  labels:
    app: strimzi
data:
  kafka-metrics-config.yml: |
    lowercaseOutputName: true
    rules:
    - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), topic=(.+), partition=(.*)><>Value
      name: kafka_server_$1_$2
      type: GAUGE
      labels:
        clientId: "$3"
        topic: "$4"
        partition: "$5"
    - pattern: kafka.server<type=(.+), name=(.+), clientId=(.+), brokerHost=(.+), brokerPort=(.+)><>Value
      name: kafka_server_$1_$2
      type: GAUGE
      labels:
        clientId: "$3"
        broker: "$4:$5"
    - pattern: kafka.server<type=(.+), cipher=(.+), protocol=(.+), listener=(.+), networkProcessor=(.+)><>connections
      name: kafka_server_$1_connections_tls_info
      type: GAUGE
      labels:
        cipher: "$2"
        protocol: "$3"
        listener: "$4"
        networkProcessor: "$5"
    - pattern: kafka.server<type=(.+), clientSoftwareName=(.+), clientSoftwareVersion=(.+), listener=(.+), networkProcessor=(.+)><>connections
      name: kafka_server_$1_connections_software
      type: GAUGE
      labels:
        clientSoftwareName: "$2"
        clientSoftwareVersion: "$3"
        listener: "$4"
        networkProcessor: "$5"
    - pattern: "kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+):"
      name: kafka_server_$1_$4
      type: GAUGE
      labels:
        listener: "$2"
        networkProcessor: "$3"
    - pattern: kafka.server<type=(.+), listener=(.+), networkProcessor=(.+)><>(.+)
      name: kafka_server_$1_$4
      type: GAUGE
      labels:
        listener: "$2"
        networkProcessor: "$3"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>MeanRate
      name: kafka_$1_$2_$3_percent
      type: GAUGE
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*><>Value
      name: kafka_$1_$2_$3_percent
      type: GAUGE
    - pattern: kafka.(\w+)<type=(.+), name=(.+)Percent\w*, (.+)=(.+)><>Value
      name: kafka_$1_$2_$3_percent
      type: GAUGE
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*, (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)PerSec\w*><>Count
      name: kafka_$1_$2_$3_total
      type: COUNTER
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Value
      name: kafka_$1_$2_$3
      type: GAUGE
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
      labels:
        "$4": "$5"
        "$6": "$7"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*), (.+)=(.+)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        "$6": "$7"
        quantile: "0.$8"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
      labels:
        "$4": "$5"
    - pattern: kafka.(\w+)<type=(.+), name=(.+), (.+)=(.*)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        "$4": "$5"
        quantile: "0.$6"
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>Count
      name: kafka_$1_$2_$3_count
      type: COUNTER
    - pattern: kafka.(\w+)<type=(.+), name=(.+)><>(\d+)thPercentile
      name: kafka_$1_$2_$3
      type: GAUGE
      labels:
        quantile: "0.$4"
    - pattern: "kafka.server<type=raft-metrics><>(.+-total|.+-max):"
      name: kafka_server_raftmetrics_$1
      type: COUNTER
    - pattern: "kafka.server<type=raft-metrics><>(.+):"
      name: kafka_server_raftmetrics_$1
      type: GAUGE
    - pattern: "kafka.server<type=raft-channel-metrics><>(.+-total|.+-max):"
      name: kafka_server_raftchannelmetrics_$1
      type: COUNTER
    - pattern: "kafka.server<type=raft-channel-metrics><>(.+):"
      name: kafka_server_raftchannelmetrics_$1
      type: GAUGE
    - pattern: "kafka.server<type=broker-metadata-metrics><>(.+):"
      name: kafka_server_brokermetadatametrics_$1
      type: GAUGE

4. Interacting with Kafka on Kubernetes

Once we apply the KafkaNodePool and Kafka objects to the Kubernetes cluster, Strimzi starts provisioning. As a result, you should see the broker pods, a single pod related to Cruise Control, and a metrics exporter pod. Each Kafka broker pod is running on a different Kubernetes node:

Clients can connect Kafka using the my-cluster-kafka-bootstrap Service under the 9092 port:

$ kubectl get svc
NAME                         TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                                        AGE
my-cluster-cruise-control    ClusterIP   10.96.108.204   <none>        9090/TCP                                       4m10s
my-cluster-kafka-bootstrap   ClusterIP   10.96.155.136   <none>        9091/TCP,9092/TCP,9093/TCP                     4m59s
my-cluster-kafka-brokers     ClusterIP   None            <none>        9090/TCP,9091/TCP,8443/TCP,9092/TCP,9093/TCP   4m59s

In the next step, we will deploy our two apps for producing and consuming messages. The producer app sends one message per second to the target topic:

@SpringBootApplication
@EnableScheduling
public class KafkaProducer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaProducer.class);

   public static void main(String[] args) {
      SpringApplication.run(KafkaProducer.class, args);
   }

   AtomicLong id = new AtomicLong();
   @Autowired
   KafkaTemplate<Long, Info> template;

   @Value("${POD:kafka-producer}")
   private String pod;
   @Value("${NAMESPACE:empty}")
   private String namespace;
   @Value("${CLUSTER:localhost}")
   private String cluster;
   @Value("${TOPIC:test}")
   private String topic;

   @Scheduled(fixedRate = 1000)
   public void send() {
      Info info = new Info(id.incrementAndGet(), 
                           pod, namespace, cluster, "HELLO");
      CompletableFuture<SendResult<Long, Info>> result = template
         .send(topic, info.getId(), info);
      result.whenComplete((sr, ex) ->
         LOG.info("Sent({}): {}", sr.getProducerRecord().key(), 
         sr.getProducerRecord().value()));
   }
}

Here’s the Deployment manifest for the producer app:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
      - name: producer
        image: piomin/producer
        resources:
          requests:
            memory: 200Mi
            cpu: 100m
        ports:
        - containerPort: 8080
        env:
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap
          - name: CLUSTER
            value: c1
          - name: TOPIC
            value: test-1
          - name: POD
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
          - name: NAMESPACE
            valueFrom:
              fieldRef:
                fieldPath: metadata.namespace

Before running the app we can create the test-1 topic with the Strimzi CRD:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: test-1
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 12
  replicas: 3
  config:
    retention.ms: 7200000
    segment.bytes: 1000000

The consumer app is listening for incoming messages. Here’s the bean responsible for receiving and logging messages:

@SpringBootApplication
@EnableKafka
public class KafkaConsumer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaConsumer.class);

   public static void main(String[] args) {
      SpringApplication.run(KafkaConsumer.class, args);
   }

   @Value("${app.in.topic}")
   private String topic;

   @KafkaListener(id = "info", topics = "${app.in.topic}")
   public void onMessage(@Payload Info info,
      @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Long key,
      @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
      LOG.info("Received(key={}, partition={}): {}", key, partition, info);
   }
}

Here’s the Deployment manifest for the consumer app:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer
spec:
  selector:
    matchLabels:
      app: consumer
  template:
    metadata:
      labels:
        app: consumer
    spec:
      containers:
      - name: consumer
        image: piomin/consumer
        resources:
          requests:
            memory: 200Mi
            cpu: 100m
        ports:
        - containerPort: 8080
        env:
          - name: TOPIC
            value: test-1
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap

We can run both Spring Boot apps using Skaffold. Firstly, we need to go to the kafka directory in our repository. Then let’s run the following command:

$ skaffold run -n strimzi --tail

Finally, we can verify the logs printed by our apps. As you see, all the messages sent by the producer app are received by the consumer app.

kafka-on-kubernetes-logs

5. Kafka Metrics in Prometheus

Once we installed the Strimzi Helm chart with the dashboard.enabled=true and dashboard.namespace=monitoring, we have several Grafana dashboard manifests placed in the monitoring namespace. Each dashboard is represented as a ConfigMap. Let’s display a list of ConfigMaps installed by the Strimzi Helm chart:

$ kubectl get cm -n monitoring | grep strimzi
strimzi-cruise-control                                    1      2m
strimzi-kafka                                             1      2m
strimzi-kafka-bridge                                      1      2m
strimzi-kafka-connect                                     1      2m
strimzi-kafka-exporter                                    1      2m
strimzi-kafka-mirror-maker-2                              1      2m
strimzi-kafka-oauth                                       1      2m
strimzi-kraft                                             1      2m
strimzi-operators                                         1      2m
strimzi-zookeeper                                         1      2m

Since Grafana is also installed in the monitoring namespace, it automatically imports all the dashboards from ConfigMaps annotated with grafana_dashboard. Consequently, after logging into Grafana (admin / prom-operator), we can easily switch between all the Kafka-related dashboards.

The only problem is that Prometheus doesn’t scrape the metrics exposed by the Kafka pods. Since we have already configured metrics exporting on the Strimzi Kafka CRD, Kafka pods expose the /metric endpoint for Prometheus under the 9404 port. Let’s take a look at the Kafka broker pod details:

In order to force Prometheus to scrape metrics from Kafka pods, we need to create the PodMonitor object. We should place it in the monitoring (1) namespace and set the release=kube-prometheus-stack label (2). The PodMonitor object filters all the pods from the strimzi namespace (3) that contains the strimzi.io/kind label having one of the values: Kafka, KafkaConnect, KafkaMirrorMaker, KafkaMirrorMaker2 (4). Also, it has to query the /metrics endpoint under the port with the tcp-prometheus name (5).

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  name: kafka-resources-metrics
  namespace: monitoring
  labels:
    app: strimzi
    release: kube-prometheus-stack
spec:
  selector:
    matchExpressions:
      - key: "strimzi.io/kind"
        operator: In
        values: ["Kafka", "KafkaConnect", "KafkaMirrorMaker", "KafkaMirrorMaker2"]
  namespaceSelector:
    matchNames:
      - strimzi
  podMetricsEndpoints:
  - path: /metrics
    port: tcp-prometheus
    relabelings:
    - separator: ;
      regex: __meta_kubernetes_pod_label_(strimzi_io_.+)
      replacement: $1
      action: labelmap
    - sourceLabels: [__meta_kubernetes_namespace]
      separator: ;
      regex: (.*)
      targetLabel: namespace
      replacement: $1
      action: replace
    - sourceLabels: [__meta_kubernetes_pod_name]
      separator: ;
      regex: (.*)
      targetLabel: kubernetes_pod_name
      replacement: $1
      action: replace
    - sourceLabels: [__meta_kubernetes_pod_node_name]
      separator: ;
      regex: (.*)
      targetLabel: node_name
      replacement: $1
      action: replace
    - sourceLabels: [__meta_kubernetes_pod_host_ip]
      separator: ;
      regex: (.*)
      targetLabel: node_ip
      replacement: $1
      action: replace

Finally, we can display the Grafana dashboard with Kafka metrics visualization. Let’s choose the dashboard with the “Strimzi Kafka” name. Here’s the general view:

kafka-on-kubernetes-metrics

There are several other diagrams available. For example, we can take a look at the statistics related to the incoming and outgoing messages.

6. Rebalancing Kafka with Cruise Control

Let’s analyze the typical scenario around Kafka related to increasing the number of brokers in the cluster. Before we do it, we will generate more incoming traffic to the test-1 topic. In order to do it, we can use the Grafana k6 tool. The k6 tool provides several extensions for load testing – including the Kafka plugin. Here’s the Deployment manifest that runs k6 with the Kafka extension on Kubernetes.

kind: ConfigMap
apiVersion: v1
metadata:
  name: load-test-cm
  namespace: strimzi
data:
  load-test.js: |
    import {
      Writer,
      SchemaRegistry,
      SCHEMA_TYPE_JSON,
    } from "k6/x/kafka";
    const writer = new Writer({
      brokers: ["my-cluster-kafka-bootstrap.strimzi:9092"],
      topic: "test-1",
    });
    const schemaRegistry = new SchemaRegistry();
    export default function () {
      writer.produce({
        messages: [
          {
            value: schemaRegistry.serialize({
              data: {
                id: 1,
                source: "test",
                space: "strimzi",
                cluster: "c1",
                message: "HELLO"
              },
              schemaType: SCHEMA_TYPE_JSON,
            }),
          },
        ],
      });
    }
    
    export function teardown(data) {
      writer.close();
    }
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: k6-test
  namespace: strimzi
spec:
  selector:
    matchLabels:
      app.kubernetes.io/name: k6-test
  template:
    metadata:
      labels:
        app.kubernetes.io/name: k6-test
    spec:
      containers:
        - image: mostafamoradian/xk6-kafka:latest
          name: xk6-kafka
          command:
            - "k6"
            - "run"
            - "--vus"
            - "1"
            - "--duration"
            - "720s"
            - "/tests/load-test.js"
          env:
            - name: KAFKA_URL
              value: my-cluster-kafka-bootstrap
            - name: CLUSTER
              value: c1
            - name: TOPIC
              value: test-1
            - name: POD
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: NAMESPACE
              valueFrom:
                fieldRef:
                  fieldPath: metadata.namespace
          volumeMounts:
            - mountPath: /tests
              name: test
      volumes:
        - name: test
          configMap:
            name: load-test-cm

Let’s apply the manifest to the strimzi namespace with the following command:

$ kubectl apply -f k8s/k6.yaml

After that, we can take a look at the k6 Pod logs. As you see, it generates and sends a lot of messages to the test-1 topic on our Kafka cluster:

Now, let’s increase the number of Kafka brokers in our cluster. We can do it by changing the value of the replicas field in the KafkaNodePool object:

$ kubectl scale kafkanodepool dual-role --replicas=4 -n strimzi

After a while, Strimzi will start a new pod with another Kafka broker. Although we have a new member of the Kafka cluster, all the partitions are still distributed only across three previous brokers. The situation would be different for the new topic. However, the partitions related to the existing topics won’t be automatically migrated to the new broker instance. Let’s verify the current partition structure for the test-1 topic with kcat CLI (I’m exposing Kafka API locally with kubectl port-forward):

$ kcat -b localhost:9092 -L -t test-1
Metadata for test-1 (from broker -1: localhost:9092/bootstrap):
 4 brokers:
  broker 0 at my-cluster-dual-role-0.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 1 at my-cluster-dual-role-1.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 2 at my-cluster-dual-role-2.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 3 at my-cluster-dual-role-3.my-cluster-kafka-brokers.strimzi.svc:9092 (controller)
 1 topics:
  topic "test-1" with 12 partitions:
    partition 0, leader 0, replicas: 0,1,2, isrs: 1,0,2
    partition 1, leader 1, replicas: 1,2,0, isrs: 1,0,2
    partition 2, leader 2, replicas: 2,0,1, isrs: 1,0,2
    partition 3, leader 0, replicas: 0,1,2, isrs: 1,0,2
    partition 4, leader 1, replicas: 1,2,0, isrs: 1,0,2
    partition 5, leader 2, replicas: 2,0,1, isrs: 1,0,2
    partition 6, leader 0, replicas: 0,1,2, isrs: 1,0,2
    partition 7, leader 1, replicas: 1,2,0, isrs: 1,0,2
    partition 8, leader 2, replicas: 2,0,1, isrs: 1,0,2
    partition 9, leader 0, replicas: 0,2,1, isrs: 1,0,2
    partition 10, leader 2, replicas: 2,1,0, isrs: 1,0,2
    partition 11, leader 1, replicas: 1,0,2, isrs: 1,0,2

Here comes Cruise Control. Cruise Control makes managing and operating Kafka much easier. For example, it allows us to move partitions across brokers after scaling up the cluster. Let’s see how it works. We have already enabled Cruise Control in the Strimzi Kafka CRD. In order to begin a rebalancing procedure, we should create the KafkaRebalance object. This object is responsible for asking Cruise Control to generate an optimization proposal.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaRebalance
metadata:
  name: my-rebalance
  labels:
    strimzi.io/cluster: my-cluster
spec: {}

If the optimization proposal is ready you will see the ProposalReady value in the Status.Conditions.Type field. I won’t get into the details of Cruise Control. It suggested moving 58 partition replicas between separate brokers in the cluster.

Let’s accept the proposal by annotating the KafkaRebalance object with strimzi.io/rebalance=approve:

$ kubectl annotate kafkarebalance my-rebalance \   
    strimzi.io/rebalance=approve -n strimzi

Finally, we can run the kcat command on the test-1 topic once again. Now, as you see, partition replicas are spread across all the brokers.

$ kcat -b localhost:9092 -L -t test-1
Metadata for test-1 (from broker -1: localhost:9092/bootstrap):
 4 brokers:
  broker 0 at my-cluster-dual-role-0.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 1 at my-cluster-dual-role-1.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 2 at my-cluster-dual-role-2.my-cluster-kafka-brokers.strimzi.svc:9092
  broker 3 at my-cluster-dual-role-3.my-cluster-kafka-brokers.strimzi.svc:9092 (controller)
 1 topics:
  topic "test-1" with 12 partitions:
    partition 0, leader 2, replicas: 2,1,3, isrs: 1,2,3
    partition 1, leader 1, replicas: 1,2,0, isrs: 1,0,2
    partition 2, leader 2, replicas: 0,2,1, isrs: 1,0,2
    partition 3, leader 0, replicas: 0,2,3, isrs: 0,2,3
    partition 4, leader 1, replicas: 3,2,1, isrs: 1,2,3
    partition 5, leader 2, replicas: 2,3,0, isrs: 0,2,3
    partition 6, leader 0, replicas: 0,1,2, isrs: 1,0,2
    partition 7, leader 1, replicas: 3,1,0, isrs: 1,0,3
    partition 8, leader 2, replicas: 2,0,1, isrs: 1,0,2
    partition 9, leader 0, replicas: 0,3,1, isrs: 1,0,3
    partition 10, leader 2, replicas: 2,3,0, isrs: 0,2,3
    partition 11, leader 1, replicas: 1,0,3, isrs: 1,0,3

Final Thoughts

Strimzi allows us not only to install and manage Kafka but also the whole ecosystem around it. In this article, I showed how to export metrics to Prometheus and use the Cruise Control tool to rebalance a cluster after scale-up. We also ran Kafka in KRaft mode and then connected two simple Java apps with the cluster through Kubernetes Service.

The post Apache Kafka on Kubernetes with Strimzi appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/11/06/apache-kafka-on-kubernetes-with-strimzi/feed/ 6 14613
Concurrency with Kafka and Spring Boot https://piotrminkowski.com/2023/04/30/concurrency-with-kafka-and-spring-boot/ https://piotrminkowski.com/2023/04/30/concurrency-with-kafka-and-spring-boot/#comments Sun, 30 Apr 2023 11:28:45 +0000 https://piotrminkowski.com/?p=14121 This article will teach you how to configure concurrency for Kafka consumers with Spring Boot and Spring for Kafka. Concurrency in Spring for Kafka is closely related to the Kafka partitions and consumer groups. Each consumer within a consumer group can receive messages from multiple partitions. While a consumer inside a group uses a single […]

The post Concurrency with Kafka and Spring Boot appeared first on Piotr's TechBlog.

]]>
This article will teach you how to configure concurrency for Kafka consumers with Spring Boot and Spring for Kafka. Concurrency in Spring for Kafka is closely related to the Kafka partitions and consumer groups. Each consumer within a consumer group can receive messages from multiple partitions. While a consumer inside a group uses a single thread, the group of consumers utilizes multiple threads to consume messages. Although each consumer is single-threaded, the processing of records can leverage multiple threads. We will analyze how to achieve it with Spring Boot and Spring for Kafka.

The topic described today, concurrency with Kafka and Spring Boot, rather deals with the basic issues. If you are looking for something more advanced in this area, you can read some of my other articles. In that article, you can find information about Kafka Streams and the Spring Cloud Stream project. You can also read more about Kafka transactions with Spring Boot in the following article.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should go to the no-transactions-service directory. After that, you should just follow my instructions. Let’s begin.

Prerequisites

We will use three different tools in the exercise today. Of course, we will create the Spring Boot consumer app using the latest version of Spring Boot 3 and Java 19. In order to run Kafka locally, we will use Redpanda – a platform compatible with Kafka API. You can easily start and manage Redpanda with their CLI tool – rpk. If you want to install rpk on your laptop please follow the installation instructions available here.

Finally, we need a tool for load testing. I’m using the k6 tool and its extension for integration with Kafka. Of course, it is just a proposition, you can use any other solution you like. With k6 I’m able to generate and send a lot of messages to Kafka quickly. In order to use k6 you need to install it on your laptop. Here are the installation instructions. After that, you need to install the xk6-kafka extension. In the following documentation, you have a full list of the k6 extensions.

Introduction

For the purpose of this exercise, we will create a simple Spring Boot application that connects to Kafka and receives messages from a single topic. From the business logic perspective, it handles transactions between the accounts and stores inside an in-memory database. Here’s a list of dependencies we need to include in the Maven pom.xml:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>

Then, let’s see the configuration settings. Our app connects to the Kafka broker using the address set in the KAFKA_URL environment variable. It expects messages in JSON format. Therefore we need to set JsonDeserializer as a value deserializer. The incoming message is serialized to the pl.piomin.services.common.model.Order object. To make it work, we need to set the spring.json.value.default.type and spring.json.trusted.packages properties. The k6 tool won’t set a header with information containing the JSON target type, so we need to disable that feature on Spring for Kafka with the spring.json.use.type.headers property.

spring:
  application.name: no-transactions-service
  kafka:
    bootstrap-servers: ${KAFKA_URL}
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        "[spring.json.value.default.type]": "pl.piomin.services.common.model.Order"
        "[spring.json.trusted.packages]": "pl.piomin.services.common.model"
        "[spring.json.use.type.headers]": false
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

Here’s the class representing incoming messages.

public class Order {

   private Long id;
   private Long sourceAccountId;
   private Long targetAccountId;
   private int amount;
   private String status;

   // GETTERS AND SETTERS...
}

The last thing we need to do is to enable Spring for Kafka and generate some test accounts for making transactions.

@SpringBootApplication
@EnableKafka
public class NoTransactionsService {

   public static void main(String[] args) {
      SpringApplication.run(NoTransactionsService.class, args);
   }

   private static final Logger LOG = LoggerFactory
      .getLogger(NoTransactionsService.class);

   Random r = new Random();

   @Autowired
   AccountRepository repository;

   @PostConstruct
   public void init() {
      for (int i = 0; i < 1000; i++) {
         repository.save(new Account(r.nextInt(1000, 10000)));
      }
   }

}

Running Kafka using Redpanda

Once we successfully installed the rpk CLI we can easily run a single-node Kafka broker by executing the following command:

$ rpk container start

Here’s the result. As you see the address of my broker is localhost:51961. The port number is generated automatically, so yours will probably be different. To simplify the next actions, let’s just set it as the REDPANDA_BROKERS environment variable.

Once we created a broker we can create a topic. We will use the topic with the transactions name in our tests. In the first step, we make tests with a single partition.

$ rpk topic create transactions -p 1

Prepare Load Tests for Kafka

Our load test will generate and send orders in JSON format with random values. The k6 tool allows us to write tests in JavaScript. We need to use the k6 Kafka extension library. The address of the Kafka broker is retrieved from the KAFKA_URL environment variable. We are incrementing the order’s id field each time we generate a new message.

import {
  Writer,
  SchemaRegistry,
  SCHEMA_TYPE_JSON,
} from "k6/x/kafka";
import { randomIntBetween } from 'https://jslib.k6.io/k6-utils/1.2.0/index.js';

const writer = new Writer({
  brokers: [`${__ENV.KAFKA_URL}`],
  topic: "transactions",
});

const schemaRegistry = new SchemaRegistry();

export function setup() {
  return { index: 1 };
}

export default function (data) {
  writer.produce({
    messages: [
      {
        value: schemaRegistry.serialize({
          data: {
            id: data.index++,
            sourceAccountId: randomIntBetween(1, 1000),
            targetAccountId: randomIntBetween(1, 1000),
            amount: randomIntBetween(10, 50),
            status: "NEW"
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
      },
    ],
  });
}

export function teardown(data) {
  writer.close();
}

Before running the test we need to set the KAFKA_URL environment variable. Then we can use the k6 run command to generate and send a lot of messages.

$ k6 run load-test.js -u 1 -d 30s 

Scenario 1: Single-partition Topic Listener

Let’s start with the defaults. Our topic has just a single partition. We are creating the @KafkaListener just with the topic and consumer group names. Once the listener receives an incoming message it invokes the AccountService bean to process the order.

@Inject
AccountService service;
    
@KafkaListener(
   id = "transactions",
   topics = "transactions",
   groupId = "a")
public void listen(Order order) {
   LOG.info("Received: {}", order);
   service.process(order);
}

Our Spring Boot Kafka app is prepared for concurrency. We will lock the Account entity during the transaction with the PESSIMISTIC_WRITE mode.

public interface AccountRepository extends CrudRepository<Account, Long> {

    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Optional<Account> findById(Long id);
}

Here’s the implementation of our AccountService bean for handling incoming orders. The process(...) method is @Transactional. In the first step, we find the source (1) and target (2) Account entity. Then we perform a transfer between the account if there are sufficient funds on the source account (3). I’m also simulating a delay just for test purposes (4). Finally, we can send a response asynchronously to another topic using the KafkaTemplate bean (5).

@Service
public class AccountService {

    private static final Logger LOG = LoggerFactory
            .getLogger(AccountService.class);
    private final Random RAND = new Random();

    KafkaTemplate<Long, Order> kafkaTemplate;
    AccountRepository repository;

    public AccountService(KafkaTemplate<Long, Order> kafkaTemplate, 
                          AccountRepository repository) {
        this.kafkaTemplate = kafkaTemplate;
        this.repository = repository;
    }

    @Transactional
    public void process(Order order) {
        Account accountSource = repository
                .findById(order.getSourceAccountId())
                .orElseThrow(); // (1)

        Account accountTarget = repository
                .findById(order.getTargetAccountId())
                .orElseThrow(); // (2)

        if (accountSource.getBalance() >= order.getAmount()) { // (3)
            accountSource.setBalance(accountSource.getBalance() - order.getAmount());
            repository.save(accountSource);
            accountTarget.setBalance(accountTarget.getBalance() + order.getAmount());
            repository.save(accountTarget);
            order.setStatus("PROCESSED");
        } else {
            order.setStatus("FAILED");
        }

        try {
            Thread.sleep(RAND.nextLong(1, 20)); // (4)
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        LOG.info("Processed: order->{}", new OrderDTO(order, accountSource, accountTarget));

        // (5)
        CompletableFuture<SendResult<Long, Order>> result = kafkaTemplate
           .send("orders", order.getId(), order);
        result.whenComplete((sr, ex) ->
                LOG.debug("Sent(key={},partition={}): {}",
                        sr.getProducerRecord().partition(),
                        sr.getProducerRecord().key(),
                        sr.getProducerRecord().value()));
    }

}

Let’s set the address of the Kafka broker in the KAFKA_URL environment variable and then start the app.

$ export KAFKA_URL=127.0.0.1:51961
$ mvn spring-boot:run

Let’s analyze what happens. Our listener is connecting to the transactions topic. It establishes just a single connection since there is a single partition.

In that case, we have just a single instance of our app running and a single thread responsible for handling messages. Let’s verify the current lag on the partition for our consumer group. As you see, the messages are processed very slowly. At first glance, you may be quite surprised.

Scenario 2: Multiple-partitions Topic Listener

Let’s analyze the next scenario. Now, the transactions topic consists of 10 partitions. We won’t change anything in the app code and configuration. We will just remove the previously created topic and create a new one with 10 partitions using the following commands:

$ rpk topic delete transactions
$ rpk topic create transaction -p 10

Once again, we are starting the app using the following Maven command:

$ mvn spring-boot:run

Let’s analyze the app logs. As you see, although we have 10 partitions there is still a single thread listening on them.

spring-boot-kafka-concurrency-single-partition

So, our situation hasn’t changed anymore. The app performance is exactly the same. However, now we can run another instance of our Spring Boot app. Once you do it you can take a look at the app logs. A new instance of the app takes 5 partitions.

In that case, a rebalancing occurs. The first instance of our Spring Boot holds 5 other partitions. Now the overall performance is twice as good as before. 

Of course, we can run more app instances. In that case, we can scale up to 10 instances since there are 10 partitions on the topic.

Scenario 3: Consumer Concurrency with Multiple Partitions

Let’s analyze another scenario. Now, we are enabling concurrency at the Kafka listener level. In order to achieve it, we need to set the concurrency field inside the @KafkaListener annotation. This parameter is still related to Kafka partitions. So, there is no sense to set the value higher than the number of partitions. In our case, there are 10 partitions – the same as in the previous scenario. 

@KafkaListener(
   id = "transactions",
   topics = "transactions",
   groupId = "a",
   concurrency = "10")
public void listen(Order order) {
   LOG.info("Received: {}", order);
   service.process(order);
}

After that, we can start the Spring Boot app. Let’s see what happens. As you see, we have 10 concurrent connections – each bound to a single thread.

spring-boot-kafka-concurrency-multi

In that case, the app performance for a single instance is around 10 times better than before. There are 10 concurrent threads, which process incoming messages.

spring-boot-kafka-concurrency-processing

However, if we run our load tests the lag on partitions is still large. Here’s the result after sending ~25k messages in 10 seconds.

spring-boot-kafka-concurrency-lag

Theoretically, we can scale up the number of instances to improve the overall performance. However, that approach won’t change anything. Why? Let’s run another one and take a look at the logs. Now, only 5 threads are still bound to the partitions. Five other threads are in the idle state. The overall performance of the system is not changed. 

Scenario 4: Process in Multiple Threads

Finally the last scenario. We will create a thread pool with the Java ExecutorService. We may still use the custom thread pool with the Kafka consumer concurrency feature as shown below (through the concurrency parameter). Each time the listener receives new messages it processes them in a separate thread.

@Service
public class NoTransactionsListener {

    private static final Logger LOG = LoggerFactory
            .getLogger(NoTransactionsListener.class);

    AccountService service;
    ExecutorService executorService = Executors.newFixedThreadPool(30);

    public NoTransactionsListener(AccountService service) {
        this.service = service;
    }

    @KafkaListener(
            id = "transactions",
            topics = "transactions",
            groupId = "a",
            concurrency = "3")
    public void listen(Order order) {
        LOG.info("Received: {}", order);
        executorService.submit(() -> service.process(order));
    }

}

In that case, one thing should be clarified. With the custom thread pool at the app level, we are losing message ordering within the single partition. The previous model guaranteed ordering, since we have a thread per partition. For our Spring Boot app, it is not important, because we are just processing messages independently.

Let’s start the app. There are 3 concurrent threads that receive messages from the partitions.

There are 30 threads for processing messages and 3 threads for listening to the partitions. Once the message is received in the consumer thread, it is handled by the worker threads.

spring-boot-kafka-concurrency-multi-threads

We can run other instances of our Spring Boot Kafka concurrency apps. I’ll run another two. The first instance grabs 4 partitions, while the next two instances 3.

Now, we can run again load test. It generated and sent ~85k messages to our Kafka broker (around 2.7k per second).

spring-boot-kafka-concurrency-k6

Let’s verify the lag within the consumer group using the rpk group command. The lag on partitions is not large. In fact, there are 90 threads within the three app instances that simultaneously are processing the incoming messages. But wait… does it mean that with 90 threads we are able to process 2.7k orders per second? We should also remember about a custom delay between 1 and 20 ms we added before (with the Thread.sleep method).

The lag looks really fine, although the app is not able to process all requests without a delay. That’s because the default ack mode for commit offset in Spring Kafka is BATCH. If we change it to the RECORD mode, which commits the offset when the listener returns after processing the record, we should get a more precise lag value. In order to override that option, we need to define the ConcurrentKafkaListenerContainerFactory bean as shown below.

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
    kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
   ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
   factory.setConsumerFactory(consumerFactory);
   factory.setConcurrency(3);
   factory.getContainerProperties()
      .setAckMode(ContainerProperties.AckMode.RECORD);
   return factory;
}

Let’s restart the app and make a load test once again. Now, the lag value is much closer to the reality.

Final Thoughts

Concurrency and performance are one of the most important things to consider when working with Kafka and Spring Boot. In this article, I wanted to explain to you some basics with simple examples. I hope it clarifies some concerns over Spring Kafka project usage.

The post Concurrency with Kafka and Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/04/30/concurrency-with-kafka-and-spring-boot/feed/ 15 14121
Kafka Transactions with Spring Boot https://piotrminkowski.com/2022/10/29/kafka-transactions-with-spring-boot/ https://piotrminkowski.com/2022/10/29/kafka-transactions-with-spring-boot/#comments Sat, 29 Oct 2022 08:23:21 +0000 https://piotrminkowski.com/?p=13623 In this article, you will learn how to use Kafka transactions with the Spring Kafka project in your Spring Boot app. In order to run the Kafka cluster we will use Upstash. This article provides a basic introduction to Kafka transactions. If you are looking for more advanced usage and scenarios you may refer to […]

The post Kafka Transactions with Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka transactions with the Spring Kafka project in your Spring Boot app. In order to run the Kafka cluster we will use Upstash. This article provides a basic introduction to Kafka transactions. If you are looking for more advanced usage and scenarios you may refer to that article, about distributed transactions in microservices. You can also read more about Kafka Streams the Spring Cloud Stream project in this article.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Getting Started with Kafka in Spring Boot

I have already created a Kafka cluster on Upstash using a web dashboard. All the connection credentials are generated automatically. You can find and copy them on the main page of your cluster.

Assuming we have a username as the KAFKA_USER variable and a password as the KAFKA_PASS variable we need to provide the following Spring configuration in the application.yml file:

spring:
  application.name: transactions-service
  kafka:
    bootstrap-servers: inviting-camel-5620-eu1-kafka.upstash.io:9092
    properties:
      security.protocol: SASL_SSL
      sasl.mechanism: SCRAM-SHA-256
      sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${KAFKA_USER}" password="${KAFKA_PASS}";

Here’s a list of required dependencies. Since we exchange JSON messages, we need the Jackson library for serialization or deserialization. Of course, we also need to include Spring Boot starter and Spring Kafka.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
</dependency>

The transactions-service is generating and sending orders. We will create the test topic transactions on the app startup.

@SpringBootApplication
public class TransactionsService {

   public static void main(String[] args) {
      SpringApplication.run(TransactionsService.class, args);
   }

   @Bean
   public NewTopic transactionsTopic() {
      return TopicBuilder.name("transactions")
          .partitions(3)
          .replicas(1)
          .build();
   }

}

Enabling Kafka Transactions in Spring Boot

In Kafka, a producer initiates a transaction by making a request to the transaction coordinator. You can find a detailed description of that process in the following article on the Confluent blog.

With Spring Boot, we just need to set the spring.kafka.producer.transaction-id-prefix property to enable transactions. Spring Boot will do the rest by automatically configuring a KafkaTransactionManager bean and wiring it into the listener container. Here’s a part of the configuration responsible for the message producer. We use JsonSerializer to serialize data from objects into JSON. Transactions prefix is tx-.

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      transaction-id-prefix: tx-

During our scenario, we will send 10 messages within a single transaction. In order to observe logs on the consumer side we set a delay between subsequent attempts to 1 second.

@Transactional
public void generateAndSendPackage() 
      throws InterruptedException, TransactionException {
   for (long i = 0; i < 10; i++) {
      Order t = new Order(id++, i+1, i+2, 1000, "NEW");
      ListenableFuture<SendResult<Long, Order>> result =
         kafkaTemplate.send("transactions", t.getId(), t);
      result.addCallback(callback);
      Thread.sleep(1000);
   }
}

Enable Transactions on the Kafka Consumer Side

In the first step, we will just print the incoming messages. We need to annotate the listener method with the @KafkaListener. The target topic is transactions, and the consumer group is a. Also, we have to add the @Transactional annotation to enable transaction support for the listen method.

@Service
public class TransactionsListener {

   private static final Logger LOG = LoggerFactory
          .getLogger(TransactionsListener.class);

   @KafkaListener(
          id = "transactions",
          topics = "transactions",
          containerGroup = "a",
          concurrency = "3")
   @Transactional
   public void listen(Order order) {
      LOG.info("{}", order);
   }
}

Let’s run the producer app first. To do go to the transactions-service directory and execute the command mvn spring-boot:run. It is a good idea to enable more detailed logs for Spring Kafka transactions. To do that add the following line to the application.yml file:

logging:
  level:
    org.springframework.transaction: trace
    org.springframework.kafka.transaction: debug

After that, let’s run the consumer app. In order to that go to the accounts-service directory and run the same command as before. You should see the following topic created in the Upstash console:

kafka-transactions-spring-boot-upstash

The transactions-service app exposes the REST endpoint for sending messages. It just starts that procedure of generating and sending 10 messages within a single transaction I mentioned in the previous section. Let’s call the endpoint:

$ curl -X POST http://localhost:8080/transactions

Let’s see at the logs on the producer side. After sending all the messages it committed the transaction.

kafka-transactions-spring-boot-logs

Now, let’s see how it looks on the consumer side. All the messages are received just after being sent by the producer app. It is not something that we expected…

In order to verify what happened, we need to take a look at the consumer app logs. Here’s a fragment with Kafka consumer settings. As you see, by default Spring Kafka sets the transactions isolation level to read_uncommitted for Spring Boot.

Deep Dive into Transactions with Spring Kafka

In order to solve the problem with transactions from the previous section, we need to change the default isolation level in the application.yml file. As the spring.kafka.consumer.properties we have to set the isolation.level property to read_commited as shown below.

spring:
  application.name: accounts-service
  kafka:
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"
        isolation.level: read_committed

After that let’s run the accounts-service app once again.

Now, all the messages have been received after the producer committed the transaction. There are three consumer threads as we set the @KafkaListener concurrency parameter to 3.

kafka-transactions-spring-boot-producer

In the next step, we will test the rollback of transactions on the producer side. In order to do that, we will modify the method for generating and sending orders. Now, the generateAndSendPackage is getting a boolean parameter, that indicates if a transaction should be rollbacked or not.

@Transactional
public void generateAndSendPackage(boolean error)
       throws InterruptedException {
   for (long i = 0; i < 10; i++) {
      Order t = new Order(id++, i+1, i+2, 1000, "NEW");
      ListenableFuture<SendResult<Long, Order>> result =
            kafkaTemplate.send("transactions", t.getId(), t);
      result.addCallback(callback);
      if (error && i > 5)
         throw new RuntimeException();
      Thread.sleep(1000);
   }
}

Here are the logs from our test. After sending six orders the method throws a RuntimeException and Spring rollbacks a transaction. As expected, the consumer app does not receive any messages.

It is important to know that Spring rollbacks are only on unchecked exceptions by default. To rollback checked exceptions, we need to specify the rollbackFor on the @Transactional annotation.

The transactional producer sends messages to the Kafka cluster even before committing the transaction. You could see it in the previous section, where the listener was continuously receiving messages if the isolation level was read_uncommited. Consequently, if we roll back a transaction on the producer side the message sent before rollback occurs come to the Kafka broker. We can see it e.g. in the Upstash live message view for the transactions topic.

kafka-transactions-spring-boot-live

Here’s the current value of offsets for all partitions in the transactions topic for the a consumer group. We made a successful commit after sending the first package of 10 messages and we rollbacked the transaction with the second package. The sum of offsets is 10 in that case. But in fact, it is different that the current latest offset on those partitions.

To verify it, we can, for example, change a consumer group name for the listener to b and start another instance of the accounts-service.

@KafkaListener(
     id = "transactions",
     topics = "transactions",
     containerGroup = "b",
     concurrency = "3")
@Transactional
public void listen(Order order) {
   LOG.info("{}", order);
}

Here’s the current value of offsets for the b consumer group.

Of course, the messages have been rollbacked. But the important thing to understand here is that these operations happen on the Kafka broker side. The transaction coordinator changes the values of Kafka offsets. We can easily verify that consumer won’t receive messages after rollback even if we the initial offset to the earliest with the spring.kafka.consumer.auto-offset-reset property.

Add Database

In this section, we will extend our scenario with new functionalities. Our app will store the status of orders in the database. Just for demo purposes, we will use an in-memory database H2. There are two dependencies required in this scenario: H2 and Spring Data JPA.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>

There the OrderGroup entity that stores the current status of the package (SENT, CONFIRMED, ROLLBACK), the total number of orders in the single package, and the total number of processed orders by the accounts-service.

@Entity
public class OrderGroup {

   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long id;
   private String status;
   private int totalNoOfOrders;
   private int processedNoOfOrders;

   // GETTERS/SETTERS ...
}

In order to manage the entity we use the Spring Data repository pattern:

public interface OrderGroupRepository extends 
   CrudRepository<OrderGroup, Long> {
}

We will also include a database in the accounts-service app. When it processes the incoming orders it performs transfers between the source and target account. It will store the account balance in the database.

@Entity
public class Account {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private int balance;

   // GETTERS/SETTERS ...
}

The same as before there is a repository bean for managing the Account entity.

public interface AccountRepository extends
   CrudRepository<Account, Long> {
}

We also need to modify the Order message exchanged between the apps. It has to contain the groupId field for processing confirmations.

public class Order {

    private Long id;
    private Long sourceAccountId;
    private Long targetAccountId;
    private int amount;
    private String status;
    private Long groupId;

   // GETTERS/SETTERS ...
}

Here’s the diagram that illustrates our architecture for the described scenario.

kafka-transactions-spring-boot-arch

Handling Transactions Across Multiple Resources

After including Spring Data JPA there are two registered TransactionManager beans with names transactionManager and kafkaTransactionManager. Therefore we need to choose the name of the transaction manager inside the @Transactional annotation. In the first step, we add a new entity to the database. The primary key id is auto-generated in the database and then returned to the object. After that, we get groupId and generate the sequence of orders within that group. Of course, both operations (save to database, sent to Kafka) are part of the same transaction.

@Transactional("kafkaTransactionManager")
public void sendOrderGroup(boolean error) throws InterruptedException {
   OrderGroup og = repository.save(new OrderGroup("SENT", 10, 0));
   generateAndSendPackage(error, og.getId());
}

private void generateAndSendPackage(boolean error, Long groupId)
      throws InterruptedException {
   for (long i = 0; i < 10; i++) {
      Order o = new Order(id++, i+1, i+2, 1000, "NEW", groupId);
      ListenableFuture<SendResult<Long, Order>> result =
         kafkaTemplate.send("transactions", o.getId(), o);
      result.addCallback(callback);
      if (error && i > 5)
         throw new RuntimeException();
      Thread.sleep(1000);
   }
}

The accounts-service app listens for incoming orders. It is processing every single order in a separate transaction. It checks if sufficient funds are in the customer account to make a transfer. If there is enough money, it performs a transfer. Finally, it sends the response to transactions-service with the transaction status. The message is sent to the orders topic.

@KafkaListener(
   id = "transactions",
   topics = "transactions",
   groupId = "a",
   concurrency = "3")
@Transactional("kafkaTransactionManager")
public void listen(Order order) {
   LOG.info("Received: {}", order);
   process(order);
}

private void process(Order order) {
   Account accountSource = repository
      .findById(order.getSourceAccountId())
      .orElseThrow();
   Account accountTarget = repository
      .findById(order.getTargetAccountId())
      .orElseThrow();
   if (accountSource.getBalance() >= order.getAmount()) {
      accountSource.setBalance(accountSource.getBalance() - order.getAmount());
      repository.save(accountSource);
      accountTarget.setBalance(accountTarget.getBalance() + order.getAmount());
      repository.save(accountTarget);
      order.setStatus("PROCESSED");
   } else {
      order.setStatus("FAILED");
   }
   LOG.info("After processing: {}", order);
   kafkaTemplate.send("orders", order.getId(), order);
}

The transactions-service listens for order confirmations on the orders topic. Once it receives the message it increases the number of processed orders within an order group and stores the current result in the database. We should use a default Spring transaction manager since we don’t send any messages to Kafka.

@KafkaListener(
      id = "orders",
      topics = "orders",
      groupId = "a",
      concurrency = "3")
@Transactional("transactionManager")
public void listen(Order order) {
   LOG.info("{}", order);
   OrderGroup og = repository
      .findById(order.getGroupId())
      .orElseThrow();
   if (order.getStatus().equals("PROCESSED")) {      
      og.setProcessedNoOfOrders(og.getProcessedNoOfOrders() + 1);
      og = repository.save(og);
      LOG.info("Current: {}", og);
   }
}

Don’t forget to lock the OrderGroup record during the transaction. Since we are processing messages concurrently (with 3 threads) we need to lock the OrderGroup record until we update the value of processedNoOfOrders column:

public interface OrderGroupRepository extends
        CrudRepository<OrderGroup, Long> {

    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Optional<OrderGroup> findById(Long groupId);
}

Let’s test a positive scenario. We will generate a group of orders that should be confirmed. To do that let’s call our endpoint POST /transactions:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d 'false'

Here are the logs from the accounts-service app:

We can also take at the logs generated by the transactions-service app:

Finally, we can verify the current status of our order group by calling the following endpoint:

$ curl -X GET 'http://localhost:8080/transactions'

What happens if we roll back the transaction? Try it by yourself with the following command:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d 'true'

Final Thoughts

You can easily handle Kafka transactions with Spring Boot using the Spring Kafka project. You can integrate your app with a database and handle transactions across multiple resources. However, one thing needs to be clarified – Kafka does not support XA transactions. It may result in data inconsistency. Spring does not solve that case, it just performs two transactions in the background. When the @Transactional method exits, Spring Boot will commit the database transactions first and then the Kafka transactions. You can just change that order to enable Kafka transaction commit first by configuring the outer method configured to use the DataSourceTransactionManager, and the inner method to use the KafkaTransactionManager.

Can we solve that case somehow? Of course. There is, for example, project Debezium that allows you to stream database changes into Kafka topics. With that approach, you can just commit changes in the database, and then configure Debezium to send events with changes to Kafka. For more details about that tool and outbox pattern please refer to the article available here.

The post Kafka Transactions with Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/10/29/kafka-transactions-with-spring-boot/feed/ 6 13623
Running Redpanda on Kubernetes https://piotrminkowski.com/2022/09/06/running-redpanda-on-kubernetes/ https://piotrminkowski.com/2022/09/06/running-redpanda-on-kubernetes/#comments Tue, 06 Sep 2022 15:29:14 +0000 https://piotrminkowski.com/?p=13109 In this article, you will learn how to install and manage Redpanda on Kubernetes. It is not the first article related to Redpanda on my blog. You can read more about Redpanda in my earlier post here. I’m describing there how to do a local development of Java apps with Redpanda, Quarkus, and Testcontainers. You […]

The post Running Redpanda on Kubernetes appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to install and manage Redpanda on Kubernetes. It is not the first article related to Redpanda on my blog. You can read more about Redpanda in my earlier post here. I’m describing there how to do a local development of Java apps with Redpanda, Quarkus, and Testcontainers.

You can use Redpanda in local development as an alternative to the standard Apache Kafka. It is a Kafka API-compatible tool but does not use ZooKeeper or JVM.

In this article, I’m going to show that you can also easily run and use Redpanda on Kubernetes. There are some interesting features that will surely interest you. Let’s begin.

Source Code

If you would like to try this exercise yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. Then switch to the redpanda branch. You will find sample applications for sending and receiving messages to Kafka in the event-driven directory. After that, just follow my instructions.

Install Cert Manager on Kubernetes

The recommended way to install Redpanda on Kubernetes is through the operator. The Redpanda operator requires the cert-manager to create certificates for TLS communication. So in the first step, we need to install cert-manager. We will use Helm for that. Let’s add the following Helm repository:

$ helm repo add jetstack https://charts.jetstack.io && \
  helm repo update

After that, we can install cert-manager in the cert-manager namespace. In order to create a namespace automatically, we should enable the create-namespace option. By default, the Cert Manager does not install CRDs on Kubernetes. Let’s enable it using the installCRDs Helm parameter:

$ helm install cert-manager \
   --namespace cert-manager --create-namespace \
   --set installCRDs=true \
   --version v1.9.1 jetstack/cert-manager

Here’s the list of the cert-manager pods. Assuming you have a similar result, you may proceed to the next section.

$ kubectl get pod -n cert-manager                                                                
NAME                                      READY   STATUS    RESTARTS   AGE
cert-manager-877fd747c-lhrgd              1/1     Running   0          1m
cert-manager-cainjector-bbdb88874-tmlg9   1/1     Running   0          1m
cert-manager-webhook-5774d5d8f7-flv7s     1/1     Running   0          1m

Install Redpanda Operator using Helm

Before we install the Redpanda operator we need to apply a single Cluster object CRD:

$ kubectl apply -k 'https://github.com/redpanda-data/redpanda/src/go/k8s/config/crd?ref=v22.2.2'

The same as before, we will use Helm in the installation process. Firstly, let’s add the official Redpanda Helm repository:

$ helm repo add redpanda https://charts.vectorized.io && \
  helm repo update

We will install the latest version of the Redpanda operator (22.2) in the redpanda-system namespace.

$ helm install redpanda-operator redpanda/redpanda-operator \
    --namespace redpanda-system \
    --create-namespace \
    --set monitoring.enabled=true \
    --version v22.2.2

After that, we may use the Cluster CRD object to create a single-node Redpanda cluster on Kubernetes. Also, let’s enable developer mode. Redpanda provides a built-in HTTP proxy and schema registry. The name of our cluster is one-node-cluster.

apiVersion: redpanda.vectorized.io/v1alpha1
kind: Cluster
metadata:
  name: one-node-cluster
spec:
  image: vectorized/redpanda
  version: latest
  replicas: 1
  resources:
    requests:
      cpu: 1
      memory: 2Gi
    limits:
      cpu: 1
      memory: 2Gi
  configuration:
    rpcServer:
      port: 33145
    kafkaApi:
    - port: 9092
    pandaproxyApi:
    - port: 8082
    schemaRegistry:
      port: 8081
    adminApi:
    - port: 9644
    developerMode: true

Redpanda will run in the redpanda namespace. Let’s create that namespace first.

$ kubectl create ns redpanda

Then, let’s create the Cluster object in the redpanda namespace.

$ kubectl apply -f redpanda-cluster.yaml -n redpanda

The Redpanda operator creates two Kubernetes Services for the cluster. The first of them one-node-cluster is a headless service and it is used in internal communication. We could have enabled external communication, but it is not required in our scenario. Applications are using the port 9092, which is compatible with the Kafka API. There is also a dedicated service for exposing the schema registry under the port 8081.

$ kubectl get svc -n redpanda
NAME                       TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                    AGE
one-node-cluster           ClusterIP   None            <none>        9644/TCP,9092/TCP,8082/TCP   3m
one-node-cluster-cluster   ClusterIP   10.98.26.202    <none>        8081/TCP                     3m

Install via Helm without operator

We can also use the Redpanda Helm chart, which does not require the installation of CRDs or an operator, but instead creates a cluster according to the configuration in a values.yaml file. This is the recommended way to install Redpanda on Kubernetes. In order to use it, clone the Redpanda repository with the Helm chart:

$ git clone https://github.com/redpanda-data/helm-charts.git
$ cd helm-charts/redpanda

Then, you need to install Redpanda using the following Helm command. Since we use a single-node Kubernetes cluster we override the default number of brokers using the statefulset.replicas parameter.

$ helm install redpanda . \
    -n redpanda \
    --create-namespace \
    --set statefulset.replicas=1

In comparison to the previous installation method, you would have to install the kube-prometheus-stack separately.

$ helm repo add prometheus-community https://prometheus-community.github.io/helm-charts && \
  helm repo update

Enable Prometheus Metrics

In the previous section, we installed the Prometheus stack using the Redpanda operator. By default, it monitors Kubernetes core components and the Redpanda operator. Our goal is to enable monitoring of the currently created Redpanda cluster in the redpanda namespace. To do that, we need to create the PodMonitor object provided by the Prometheus operator. It requires us to at least set the pod target namespace, label selector, and metrics endpoints. The Redpanda operator exposes metrics on the admin port (9644) under metrics and public_metrics paths.

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  labels:
    app.kubernetes.io/instance: redpanda-cluster
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/name: redpanda-cluster
    app.kubernetes.io/version: v22.2.2
    helm.sh/chart: redpanda-operator-v22.2.2
    release: redpanda-operator
  name: redpanda-cluster-monitor
  namespace: redpanda-system
spec:
  namespaceSelector:
    matchNames:
      - redpanda
  podMetricsEndpoints:
    - path: /metrics
      port: admin
    - path: /public_metrics
      port: admin
  selector:
    matchLabels:
      app.kubernetes.io/name: redpanda

Once you create the PodMonitor, you will be able to query Redpanda metrics. There are a lot of exported metrics. You can verify these metrics in the Prometheus dashboard. Their names are starting with the vectorized_ prefix.

redpanda-kubernetes-prometheus

The simplest way to view important metrics is through the Grafana dashboard. Therefore, we are going to create a dashboard there. Fortunately, Redpanda provides an automatic mechanism for generating a Grafana dashboard. There is a dedicated Redpanda CLI (rpk) command to do it. We just need to set the name of the data source (Prometheus) and metrics endpoint URL (public_metrics).

$ kubectl exec pod/one-node-cluster-0 -n redpanda -c redpanda \
    -- rpk generate grafana-dashboard --datasource Prometheus \
    --metrics-endpoint http://localhost:9644/public_metrics \
    > redpanda-dashboard.json

Once we export the dashboard as a JSON file, we may import it into the Grafana dashboard.

Enable Redpanda Console

Redpanda provides a UI dashboard for managing cluster instances called Redpanda Console. However, it is not installed by the operator. In order to run it on Kubernetes, we will use the Helm chart. Firstly, let’s add the required Helm repository:

$ helm repo add redpanda-console https://packages.vectorized.io/public/console/helm/charts/ && \
  helm repo update

We need to override some configuration settings. By default, the Redpanda Console tries to detect both broker and schema registry on a localhost address. Since we are running Redpanda on Kubernetes we need to set the name of the service one-node-cluster for the broker and one-node-cluster-cluster for the schema registry. Here’s our values.yaml file:

console:
  config:
    kafka:
      brokers:
        - one-node-cluster:9092
      clientId: redpanda-console
    schemaRegistry:
      enabled: true
      urls: ["http://one-node-cluster-cluster:8081"]
      username: console
      password: redacted

Finally, let’s install the console in the same namespace as the broker.

$ helm install redpanda-console redpanda-console/console \
    --values values.yaml \
    -n redpanda

The Redpanda Console is available under the 8080 port. We can enable port-forward to access it locally.

Integrate Spring Boot with Redpanda

Our instance of Redpanda is ready. Let’s run sample applications on Kubernetes. They are sending events to Redpanda and receiving them from Redpanda. Our applications are written in Kotlin and built on top of Spring Boot and use Spring Cloud Stream to integrate with the Kafka-compatible API. They are using the Avro format for serializing and deserializing messages. Thanks to the Spring Cloud Schema Registry client, they may also integrate with the schema registry provided by Redpanda.

Here’s the list of required modules for both producer and consumer apps:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-schema-registry-client</artifactId>
</dependency>

If you would like to read more about Spring Cloud support for Kafka and schema registry you can refer to this article on my blog. It describes how to build event-driven architectures with Spring Cloud Stream Kafka and use Avro format in communication between apps.

Our producer app continuously generates and sends messages to the Redpanda topic. It is integrated with the schema registry available under the address provided in the spring.cloud.schemaRegistryClient.endpoint property. To enable that integration, we need to annotate the main class with the @EnableSchemaRegistryClient.

@SpringBootApplication
@EnableSchemaRegistryClient
class ProductionApplication {

   var id: Int = 0

   @Value("\${callme.supplier.enabled}")
   val supplierEnabled: Boolean = false

   @Bean
   fun callmeEventSupplier(): Supplier<Message<CallmeEvent>?> = Supplier { createEvent() }

   @Primary
   @Bean
   fun schemaRegistryClient(@Value("\${spring.cloud.schemaRegistryClient.endpoint}") endpoint: String?): SchemaRegistryClient {
      val client = ConfluentSchemaRegistryClient()
      client.setEndpoint(endpoint)
      return client
   }

   private fun createEvent(): Message<CallmeEvent>? {
      return if (supplierEnabled)
          MessageBuilder.withPayload(CallmeEvent(++id, "I'm callme event!", "ping"))
               .setHeader("to_process", true)
               .build()
      else
         null
   }
}

Our app does not contain a lot of code, however, we need to provide some configuration settings. In order to enable serialization with Avro, we need to set a default content type to application/*+avro (1). The target topic on Redpanda is callme-events (2). It consists of 2 partitions (3). We also need to set the name of bean responsible for generating messages (4). With the property spring.cloud.schema.avro.dynamicSchemaGenerationEnabled we may enable automatic generation of the Avro schema based on the source code (5). Of course, we also need provide the Redpanda broker address (6) and schema registry address (7).

spring.application.name=producer-service
spring.cloud.stream.default.contentType=application/*+avro # (1)
spring.cloud.stream.bindings.callmeEventSupplier-out-0.contentType=application/*+avro
spring.cloud.stream.bindings.callmeEventSupplier-out-0.destination=callme-events # (2)
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionCount=2 # (3)
spring.cloud.stream.source=callmeEventSupplier # (4)

spring.cloud.schema.avro.dynamicSchemaGenerationEnabled=true # (5)
spring.cloud.schemaRegistryClient.endpoint=http://one-node-cluster-cluster:8081/ # (6)
spring.kafka.bootstrap-servers=one-node-cluster:9092 # (7)
spring.main.allow-bean-definition-overriding=true

callme.supplier.enabled=true

Finally, let’s build and deploy our producer app on Kubernetes. We may use Skaffold for that. The app source code is configured to support it. Here’s our Deployment manifest:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
      - name: producer
        image: piomin/producer-service
        ports:
        - containerPort: 8080

Let’s verify a list of running pods in the redpanda namespace:

$ kubectl get pod -n redpanda
NAME                               READY   STATUS    RESTARTS   AGE
one-node-cluster-0                 1/1     Running   0          112m
producer-5b7f5cfcc6-586z2          1/1     Running   0          65m
redpanda-console-dcf446dc8-fzc2t   1/1     Running   0          104m

Monitor Redpanda on Kubernetes with Console and Prometheus

Our producer app is running on Kubernetes. It generates and sends messages. Let’s switch to the Redpanda Console. Here’s the view with a list of topics. As you see, the topic callme-events has been created:

redpanda-kubernetes-console

If you click on the topic, you will see the details and a list of messages:

Also, let’s verify the message schema available under the Schema Registry menu. You can compare it with the CallmeEvent object in the source code.

redpanda-kubernetes-schema

Then, let’s run our consumer app. It also integrates with the schema registry and receives messages form callme-events topic.

Thanks to Prometheus and Grafana, we can monitor several parameters related to the Redpanda broker. Here’s the screen from the Grafana dashboard:

Final Thoughts

Redpanda simplifies deployment on Kubernetes in comparison to the standard Kafka. Within the single pod, we have a broker, a schema registry, and an HTTP proxy. We can also easily install a UI console to manage Redpanda graphically. We can easily customize the Redpanda cluster using the CRD object provided by the operator.

The post Running Redpanda on Kubernetes appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/09/06/running-redpanda-on-kubernetes/feed/ 2 13109
ActiveMQ Artemis with Spring Boot on Kubernetes https://piotrminkowski.com/2022/07/26/activemq-artemis-with-spring-boot-on-kubernetes/ https://piotrminkowski.com/2022/07/26/activemq-artemis-with-spring-boot-on-kubernetes/#comments Tue, 26 Jul 2022 09:58:34 +0000 https://piotrminkowski.com/?p=12504 This article will teach you how to run ActiveMQ on Kubernetes and integrate it with your app through Spring Boot. We will deploy a clustered ActiveMQ broker using a dedicated operator. Then we are going to build and run two Spring Boot apps. The first of them is running in multiple instances and receiving messages […]

The post ActiveMQ Artemis with Spring Boot on Kubernetes appeared first on Piotr's TechBlog.

]]>
This article will teach you how to run ActiveMQ on Kubernetes and integrate it with your app through Spring Boot. We will deploy a clustered ActiveMQ broker using a dedicated operator. Then we are going to build and run two Spring Boot apps. The first of them is running in multiple instances and receiving messages from the queue, while the second is sending messages to that queue. In order to test the ActiveMQ cluster, we will use Kind. The consumer app connects to the cluster using several different modes. We will discuss those modes in detail.

You can find a lot of articles about other message brokers like RabbitMQ or Kafka on my blog. If you would to read about RabbitMQ on Kubernetes please refer to that article. In order to find out more about Kafka and Spring Boot integration, you can read the article about Kafka Streams and Spring Cloud Stream available here. Previously I didn’t write much about ActiveMQ, but it is also a very popular message broker. For example, it supports the latest version of AMQP protocol, while Rabbit is based on their extension of AMQP 0.9.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then go to the messaging directory. You will find there three Spring Boot apps: simple-producer, simple-consumer and simple-counter. After that, you should just follow my instructions. Let’s begin.

Integrate Spring Boot with ActiveMQ

Let’s begin with integration between our Spring Boot apps and the ActiveMQ Artemis broker. In fact, ActiveMQ Artemis is the base of the commercial product provided by Red Hat called AMQ Broker. Red Hat actively develops a Spring Boot starter for ActiveMQ and an operator for running it on Kubernetes. In order to access Spring Boot, you need to include the Red Hat Maven repository in your pom.xml file:

<repository>
  <id>red-hat-ga</id>
  <url>https://maven.repository.redhat.com/ga</url>
</repository>

After that, you can include a starter in your Maven pom.xml:

<dependency>
  <groupId>org.amqphub.spring</groupId>
  <artifactId>amqp-10-jms-spring-boot-starter</artifactId>
  <version>2.5.6</version>
  <exclusions>
    <exclusion>
      <groupId>org.slf4j</groupId>
      <artifactId>log4j-over-slf4j</artifactId>
    </exclusion>
  </exclusions>
</dependency>

Then, we just need to enable JMS for our app with the @EnableJMS annotation:

@SpringBootApplication
@EnableJms
public class SimpleConsumer {

   public static void main(String[] args) {
      SpringApplication.run(SimpleConsumer.class, args);
   }

}

Our application is very simple. It just receives and prints an incoming message. The method for receiving messages should be annotated with @JmsListener. The destination field contains the name of a target queue.

@Service
public class Listener {

   private static final Logger LOG = LoggerFactory
      .getLogger(Listener.class);

   @JmsListener(destination = "test-1")
   public void processMsg(SimpleMessage message) {
      LOG.info("============= Received: " + message);
   }

}

Here’s the class that represents our message:

public class SimpleMessage implements Serializable {

   private Long id;
   private String source;
   private String content;

   public SimpleMessage() {
   }

   public SimpleMessage(Long id, String source, String content) {
      this.id = id;
      this.source = source;
      this.content = content;
   }

   // ... GETTERS AND SETTERS

   @Override
   public String toString() {
      return "SimpleMessage{" +
              "id=" + id +
              ", source='" + source + '\'' +
              ", content='" + content + '\'' +
              '}';
   }
}

Finally, we need to set connection configuration settings. With AMQP Spring Boot starter it is very simple. We just need to set the property amqphub.amqp10jms.remoteUrl. For now, we are going to base on the environment variable set at the level of Kubernetes Deployment.

amqphub.amqp10jms.remoteUrl = ${ARTEMIS_URL}

The producer application is pretty similar. Instead of the annotation for receiving messages, we use Spring JmsTemplate for producing and sending messages to the target queue. The method for sending messages is exposed as an HTTP POST /producer/send endpoint.

@RestController
@RequestMapping("/producer")
public class ProducerController {

   private static long id = 1;
   private final JmsTemplate jmsTemplate;
   @Value("${DESTINATION}")
   private String destination;

   public ProducerController(JmsTemplate jmsTemplate) {
      this.jmsTemplate = jmsTemplate;
   }

   @PostMapping("/send")
   public SimpleMessage send(@RequestBody SimpleMessage message) {
      if (message.getId() == null) {
          message.setId(id++);
      }
      jmsTemplate.convertAndSend(destination, message);
      return message;
   }
}

Create a Kind cluster with Nginx Ingress

Our example apps are ready. Before deploying them, we need to prepare the local Kubernetes cluster. We will deploy there the ActiveMQ cluster consisting of three brokers. Therefore, our Kubernetes cluster will also consist of three nodes. Consequently, there are three instances of the consumer app running on Kubernetes. They are connecting to the ActiveMQ brokers over the AMQP protocol. There is also a single instance of the producer app that sends messages on demand. Here’s the diagram of our architecture.

activemq-spring-boot-kubernetes-arch

In order to run a multi-node Kubernetes cluster locally, we will use Kind. We will test not only communication over AMQP protocol but also expose the ActiveMQ management console over HTTP. Because ActiveMQ uses headless services for exposing a web console we have to create and configure Ingress on Kind to access it. Let’s begin.

In the first step, we are going to create a Kind cluster. It consists of a control plane and three workers. The configuration has to be prepared correctly to run the Nginx Ingress Controller. We should add the ingress-ready label to a single worker node and expose ports 80 and 443. Here’s the final version of a Kind config file:

kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
  - role: control-plane
  - role: worker
    kubeadmConfigPatches:
    - |
      kind: JoinConfiguration
      nodeRegistration:
        kubeletExtraArgs:
          node-labels: "ingress-ready=true"
    extraPortMappings:
    - containerPort: 80
      hostPort: 80
      protocol: TCP
    - containerPort: 443
      hostPort: 443
      protocol: TCP  
  - role: worker
  - role: worker

Now, let’s create a Kind cluster by executing the following command:

$ kind create cluster --config kind-config.yaml

If your cluster has been successfully created you should see similar information:

After that, let’s install the Nginx Ingress Controller. It is just a single command:

$ kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/main/deploy/static/provider/kind/deploy.yaml

Let’s verify the installation:

$ kubectl get pod -n ingress-nginx
NAME                                        READY   STATUS      RESTARTS  AGE
ingress-nginx-admission-create-wbbzh        0/1     Completed   0         1m
ingress-nginx-admission-patch-ws2mv         0/1     Completed   0         1m
ingress-nginx-controller-86b6d5756c-rkbmz   1/1     Running     0         1m

Install ActiveMQ Artemis on Kubernetes

Finally, we may proceed to the ActiveMQ Artemis installation. Firstly, let’s install the required CRDs. You may find all the YAML manifests inside the operator repository on GitHub.

$ git clone https://github.com/artemiscloud/activemq-artemis-operator.git
$ cd activemq-artemis-operator

The manifests with CRDs are located in the deploy/crds directory:

$ kubectl create -f ./deploy/crds

After that, we can install the operator:

$ kubectl create -f ./deploy/service_account.yaml
$ kubectl create -f ./deploy/role.yaml
$ kubectl create -f ./deploy/role_binding.yaml
$ kubectl create -f ./deploy/election_role.yaml
$ kubectl create -f ./deploy/election_role_binding.yaml
$ kubectl create -f ./deploy/operator_config.yaml
$ kubectl create -f ./deploy/operator.yaml

In order to create a cluster, we have to create the ActiveMQArtemis object. It contains a number of brokers being a part of the cluster (1). We should also set the accessor, to expose the AMQP port outside of every single broker pod (2). Of course, we will also expose the management console (3).

apiVersion: broker.amq.io/v1beta1
kind: ActiveMQArtemis
metadata:
  name: ex-aao
spec:
  deploymentPlan:
    size: 3 # (1)
    image: placeholder
    messageMigration: true
    resources:
      limits:
        cpu: "500m"
        memory: "1024Mi"
      requests:
        cpu: "250m"
        memory: "512Mi"
  acceptors: # (2)
    - name: amqp
      protocols: amqp
      port: 5672
      connectionsAllowed: 5
  console: # (3)
    expose: true

Once the ActiveMQArtemis is created, and the operator starts the deployment process. It creates the StatefulSet object:

$ kubectl get statefulset
NAME        READY   AGE
ex-aao-ss   3/3     1m

It starts all three pods with brokers sequentially:

$ kubectl get pod -l application=ex-aao-app
NAME          READY   STATUS    RESTARTS    AGE
ex-aao-ss-0   1/1     Running   0           5m
ex-aao-ss-1   1/1     Running   0           3m
ex-aao-ss-2   1/1     Running   0           1m

Let’s display a list of Services created by the operator. There is a single Service per broker for exposing the AMQP port (ex-aao-amqp-*) and web console (ex-aao-wsconsj-*):

activemq-spring-boot-kubernetes-services

The operator automatically creates Ingress objects per each web console Service. We will modify them by adding different hosts. Let’s say that is the one.activemq.com domain for the first broker, two.activemq.com for the second broker, etc.

$ kubectl get ing    
NAME                      CLASS    HOSTS                  ADDRESS     PORTS   AGE
ex-aao-wconsj-0-svc-ing   <none>   one.activemq.com       localhost   80      1h
ex-aao-wconsj-1-svc-ing   <none>   two.activemq.com       localhost   80      1h
ex-aao-wconsj-2-svc-ing   <none>   three.activemq.com                  localhost   80      1h

After creating ingresses we would have to add the following line in /etc/hosts.

127.0.0.1    one.activemq.com two.activemq.com three.activemq.com

Now, we access the management console, for example for the third broker under the following URL http://three.activemq.com/console.

activemq-spring-boot-kubernetes-console

Once the broker is ready, we may define a test queue. The name of that queue is test-1.

apiVersion: broker.amq.io/v1beta1
kind: ActiveMQArtemisAddress
metadata:
  name: address-1
spec:
  addressName: address-1
  queueName: test-1
  routingType: anycast

Run the Spring Boot app on Kubernetes and connect to ActiveMQ

Now, let’s deploy the consumer app. In the Deployment manifest, we have to set the ActiveMQ cluster connection URL. But wait… how to connect it? There are three brokers exposed using three separate Kubernetes Services. Fortunately, the AMQP Spring Boot starter supports it. We may set the addresses of three brokers inside the failover section. Let’s try it to see what will happen.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: simple-consumer
spec:
  replicas: 3
  selector:
    matchLabels:
      app: simple-consumer
  template:
    metadata:
      labels:
        app: simple-consumer
    spec:
      containers:
      - name: simple-consumer
        image: piomin/simple-consumer
        env:
          - name: ARTEMIS_URL
            value: failover:(amqp://ex-aao-amqp-0-svc:5672,amqp://ex-aao-amqp-1-svc:5672,amqp://ex-aao-amqp-2-svc:5672)
        resources:
          limits:
            memory: 256Mi
            cpu: 500m
          requests:
            memory: 128Mi
            cpu: 250m

The application is prepared to be deployed with Skaffold. If you run the skaffold dev command you will deploy and see the logs of all three instances of the consumer app. What’s the result? All the instances connect to the first URL from the list as shown below.

Fortunately, there is a failover parameter that helps distribute client connections more evenly across multiple remote peers. With the failover.randomize option, URIs are randomly shuffled before attempting to connect to one of them. Let’s replace the ARTEMIS_URL env in the Deployment manifest with the following line:

failover:(amqp://ex-aao-amqp-0-svc:5672,amqp://ex-aao-amqp-1-svc:5672,amqp://ex-aao-amqp-2-svc:5672)?failover.randomize=true

The distribution between broker instances looks slightly better. Of course, the result is random, so you may get different results.

The first way to distribute the connections is through the dedicated Kubernetes Service. We don’t have to leverage the services created automatically by the operator. We can create our own Service that load balances between all available pods with brokers.

kind: Service
apiVersion: v1
metadata:
  name: ex-aao-amqp-lb
spec:
  ports:
    - name: amqp
      protocol: TCP
      port: 5672
  type: ClusterIP
  selector:
    application: ex-aao-app

Now, we can resign from the failover section on the client side and fully rely on Kubernetes mechanisms.

spec:
  containers:
  - name: simple-consumer
    image: piomin/simple-consumer
    env:
      - name: ARTEMIS_URL
        value: amqp://ex-aao-amqp-lb:5672

This time we won’t see anything in the application logs, because all the instances connect to the same URL. We can verify a distribution between all the broker instances using e.g. the management web console. Here’s a list of consumers on the first instance of ActiveMQ:

Below, you will exactly the same results for the second instance. All the consumer app instances have been distributed equally between all available brokers inside the cluster.

Now, we are going to deploy the producer app. We use the same Kubernetes Service for connecting the ActiveMQ cluster.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: simple-producer
spec:
  replicas: 3
  selector:
    matchLabels:
      app: simple-producer
  template:
    metadata:
      labels:
        app: simple-producer
    spec:
      containers:
        - name: simple-producer
          image: piomin/simple-producer
          env:
            - name: ARTEMIS_URL
              value: amqp://ex-aao-amqp-lb:5672
            - name: DESTINATION
              value: test-1
          ports:
            - containerPort: 8080

Because we have to call the HTTP endpoint let’s create the Service for the producer app:

apiVersion: v1
kind: Service
metadata:
  name: simple-producer
spec:
  type: ClusterIP
  selector:
    app: simple-producer
  ports:
  - port: 8080

Let’s deploy the producer app using Skaffold with port-forwarding enabled:

$ skaffold dev --port-forward

Here’s a list of our Deployments:

In order to send a test message just execute the following command:

$ curl http://localhost:8080/producer/send \
  -d "{\"source\":\"test\",\"content\":\"Hello\"}" \
  -H "Content-Type:application/json"

Advanced configuration

If you need more advanced traffic distribution between brokers inside the cluster you can achieve it in several ways. For example, we can dynamically override configuration property on runtime. Here’s a very simple example. After starting the application we are connecting the external service over HTTP. It returns the next instance number.

@Configuration
public class AmqpConfig {

    @PostConstruct
    public void init() {
        RestTemplate t = new RestTemplateBuilder().build();
        int x = t.getForObject("http://simple-counter:8080/counter", Integer.class);
        System.setProperty("amqphub.amqp10jms.remoteUrl",
                "amqp://ex-aao-amqp-" + x + "-svc:5672");
    }

}

Here’s the implementation of the counter app. It just increments the number and divides it by the number of the broker instances. Of course, we may create a more advanced implementation, and provide e.g. connection to the instance of a broker running on the same Kubernetes node as the app pod.

@SpringBootApplication
@RestController
@RequestMapping("/counter")
public class CounterApp {

   private static int c = 0;

   public static void main(String[] args) {
      SpringApplication.run(CounterApp.class, args);
   }

   @Value("${DIVIDER:0}")
   int divider;

   @GetMapping
   public Integer count() {
      if (divider > 0)
         return c++ % divider;
      else
         return c++;
   }
}

Final Thoughts

ActiveMQ is an interesting alternative to RabbitMQ as a message broker. In this article, you learned how to run, manage and integrate ActiveMQ with Spring Boot on Kubernetes. It can be declaratively managed on Kubernetes thanks to ActiveMQ Artemis Operator. You can also easily integrate it with Spring Boot using a dedicated starter. It provides various configuration options and is actively developed by Red Hat and the community.

The post ActiveMQ Artemis with Spring Boot on Kubernetes appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/07/26/activemq-artemis-with-spring-boot-on-kubernetes/feed/ 6 12504
Introduction to ksqlDB on Kubernetes with Spring Boot https://piotrminkowski.com/2022/06/22/introduction-to-ksqldb-on-kubernetes-with-spring-boot/ https://piotrminkowski.com/2022/06/22/introduction-to-ksqldb-on-kubernetes-with-spring-boot/#comments Wed, 22 Jun 2022 14:01:47 +0000 https://piotrminkowski.com/?p=11924 In this article, you will learn how to run ksqlDB on Kubernetes and use it with Spring Boot. You will also see how to run Kafka on Kubernetes based on the Strimzi operator. In order to integrate Spring Boot with the ksqlDB server, we are going to leverage a lightweight Java client provided by ksqlDB. […]

The post Introduction to ksqlDB on Kubernetes with Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to run ksqlDB on Kubernetes and use it with Spring Boot. You will also see how to run Kafka on Kubernetes based on the Strimzi operator. In order to integrate Spring Boot with the ksqlDB server, we are going to leverage a lightweight Java client provided by ksqlDB. This client supports pull and push queries. It also provides an API for inserting rows and creating tables or streams. You can read more about it in the ksqlDB documentation here.

Our sample Spring Boot application is very simple. We will use Spring Cloud Stream Supplier bean for generating and sending events to the Kafka topic. For more information about Kafka with Spring Cloud Stream please refer to the following article. On the other hand, our application gets data from the Kafka topic using kSQL queries. It also creates KTable on startup.

Let’s take a look at our architecture.

ksqldb-kubernetes-arch

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then go to the transactions-service directory. After that, you should just follow my instructions. Let’s begin.

Prerequisites

We will use several tools. You need to have:

  1. Kubernetes cluster – it may be a single-node, local cluster like Minikube or Kind. Personally, I’m using Kubernetes on the Docker Desktop
  2. kubectl CLI – to interact with the cluster
  3. Helm – we will use it to install the ksqlDB server on Kubernetes. If you don’t have Helm, you will have to install it

Run Kafka on Kubernetes with Strimzi

Of course, we need an instance of Kafka to perform our exercise. There are several ways to run Kafka on Kubernetes. I’ll show you how to do it with the operator-based approach. In the first step, you need to install OLM (Operator Lifecycle Manager) on your cluster. In order to do that, you can just execute the following command on your Kubernetes context:

$ curl -L https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.21.2/install.sh -o install.sh
$ chmod +x install.sh
$ ./install.sh v0.21.2

Then, you can proceed to the Strimzi operator installation. That’s just a single command.

$ kubectl create -f https://operatorhub.io/install/stable/strimzi-kafka-operator.yaml

Now, we can create a Kafka cluster on Kubernetes. Let’s begin with a dedicated namespace for our exercise:

$ kubectl create ns kafka

I assume you have a single-node Kubernetes cluster, so we also create a single-node Kafka. Here’s the YAML manifest with Kafka CRD. You can find it in the repository under the path k8s/cluster.yaml.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  entityOperator:
    topicOperator: {}
    userOperator: {}
  kafka:
    config:
      default.replication.factor: 1
      inter.broker.protocol.version: "3.2"
      min.insync.replicas: 1
      offsets.topic.replication.factor: 1
      transaction.state.log.min.isr: 1
      transaction.state.log.replication.factor: 1
    listeners:
      - name: plain
        port: 9092
        tls: false
        type: internal
      - name: tls
        port: 9093
        tls: true
        type: internal
    replicas: 1
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 30Gi
          deleteClaim: true
    version: 3.2.0
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: true

Let’s apply it to Kubernetes in the kafka namespace:

$ kubectl apply -f k8s/cluster.yaml -n kafka

You should see a single instance of Kafka and also a single instance of Zookeeper. If the pods are running, it means you have Kafka on Kubernetes.

$ kubectl get pod -n kafka
NAME                                          READY   STATUS    RESTARTS  AGE
my-cluster-entity-operator-68cc6bc4d9-qs88p   3/3     Running   0         46m
my-cluster-kafka-0                            1/1     Running   0         48m
my-cluster-zookeeper-0                        1/1     Running   0         48m

Kafka is available inside the cluster under the name my-cluster-kafka-bootstrap and port 9092.

kubectl get svc -n kafka
NAME                          TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                               AGE
my-cluster-kafka-bootstrap    ClusterIP   10.108.109.255   <none>        9091/TCP,9092/TCP,9093/TCP            47m
my-cluster-kafka-brokers      ClusterIP   None             <none>        9090/TCP,9091/TCP,9092/TCP,9093/TCP   47m
my-cluster-zookeeper-client   ClusterIP   10.102.10.251    <none>        2181/TCP                              47m
my-cluster-zookeeper-nodes    ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP            47m

Run KsqlDB Server on Kubernetes

The KsqlDB Server is a part of the Confluent Platform. Since we are not installing the whole Confluent Platform on Kubernetes, but just an open-source Kafka cluster, we need to install KsqlDB Server separately. Let’s do it with Helm. There is no “official” Helm chart for the KSQL server. Therefore, we should go directly to the Confluent Helm repository on GitHub:

$ git clone https://github.com/confluentinc/cp-helm-charts.git
$ cd cp-helm-charts

In this repository, you can find separate Helm charts for every single Confluent component including e.g. control center or KSQL Server. The location of our chart inside the repository is charts/cp-ksql-server. We need to override some default settings during installation. First of all, we have to disable the headless mode. In the headless mode, KSQL Server does not expose the HTTP endpoint and loads queries from the input script. Our Spring Boot app will connect to the server through HTTP. In the next step, we should override the default address of the Kafka cluster and the default version of the KSQL Server which is still 6.1.0 there. We will use the latest version 7.1.1. Here’s the helm command you should run on your Kubernetes cluster:

$ helm install cp-ksql-server \
    --set ksql.headless=false \
    --set kafka.bootstrapServers=my-cluster-kafka-bootstrap:9092 \
    --set imageTag=7.1.1 \
  charts/cp-ksql-server -n kafka

Here’s the result:

Let’s verify if KSQL is running on the cluster:

$ kubectl get pod -n kafka | grep ksql
cp-ksql-server-679fc98889-hldfv               2/2     Running   0               2m11s

The HTTP endpoint is available for other applications under the name cp-ksql-server and port 8088:

$ kubectl get svc -n kafka | grep ksql
cp-ksql-server                ClusterIP   10.109.189.36    <none>        8088/TCP,5556/TCP                     3m25s

Now, we have the whole required staff running on our Kubernetes cluster. Therefore, we can proceed to the Spring Boot app implementation.

Integrate Spring Boot with ksqlDB

I didn’t find any out-of-the-box integration between Spring Boot and ksqlDB. Therefore, we will use the ksqldb-api-client directly. In the first, we need to include the ksqlDB Maven repository and some dependencies:

<dependencies>
        ...

  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-api-client</artifactId>
    <version>0.26.0</version>
  </dependency>
  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-udf</artifactId>
    <version>0.26.0</version>
  </dependency>
  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-common</artifactId>
    <version>0.26.0</version>
  </dependency>
</dependencies>

<repositories>
  <repository>
    <id>ksqlDB</id>
    <name>ksqlDB</name>
    <url>https://ksqldb-maven.s3.amazonaws.com/maven/</url>
  </repository>
</repositories>

After that, we can define a Spring @Bean returning the ksqlDB Client implementation. Since we will run our application in the same namespace as the KSQL Server, we need to provide the Kubernetes Service name as the host name.

@Configuration
public class KSQLClientProducer {

    @Bean
    Client ksqlClient() {
        ClientOptions options = ClientOptions.create()
                .setHost("cp-ksql-server")
                .setPort(8088);
        return Client.create(options);
    }
}

Our application is interacting with KSQL Server through an HTTP endpoint. It creates a single KTable on startup. To do that, we need to invoke the executeStatement method on the instance of the KSQL Client bean. We are creating the SOURCE table to enable running pull queries on it. The table gets data from the transactions topic. It expects JSON format in the incoming events.

public class KTableCreateListener implements ApplicationListener<ContextRefreshedEvent> {

   private static final Logger LOG = LoggerFactory.getLogger(KTableCreateListener.class);
   private Client ksqlClient;

   public KTableCreateListener(Client ksqlClient) {
      this.ksqlClient = ksqlClient;
   }

   @Override
   public void onApplicationEvent(ContextRefreshedEvent event) {
      try {
         String sql = """
                 CREATE SOURCE TABLE IF NOT EXISTS transactions_view (
                   id BIGINT PRIMARY KEY,
                   sourceAccountId BIGINT,
                   targetAccountId BIGINT,
                   amount INT
                 ) WITH (
                   kafka_topic='transactions',
                   value_format='JSON'
                 );
                 """;
         ExecuteStatementResult result = ksqlClient.executeStatement(sql).get();
         LOG.info("Result: {}", result.queryId().orElse(null));
      } catch (ExecutionException | InterruptedException e) {
         LOG.error("Error: ", e);
      }
   }
}

After creating the table we can run some queries on it. There are pretty simple queries. We are trying to find all transactions and all transactions related to the particular account.

@RestController
@RequestMapping("/transactions")
public class TransactionResource {

   private static final Logger LOG = LoggerFactory.getLogger(TransactionResource.class);
   Client ksqlClient;

   public TransactionResource(Client ksqlClient) {
      this.ksqlClient = ksqlClient;
   }

   @GetMapping
   public List<Transaction> getTransactions() throws ExecutionException, InterruptedException {
      StreamedQueryResult sqr = ksqlClient
            .streamQuery("SELECT * FROM transactions_view;")
            .get();
      Row row;
      List<Transaction> l = new ArrayList<>();
      while ((row = sqr.poll()) != null) {
         l.add(mapRowToTransaction(row));
      }
      return l;
   }

   @GetMapping("/target/{accountId}")
   public List<Transaction> getTransactionsByTargetAccountId(@PathVariable("accountId") Long accountId)
            throws ExecutionException, InterruptedException {
      StreamedQueryResult sqr = ksqlClient
            .streamQuery("SELECT * FROM transactions_view WHERE sourceAccountId=" + accountId + ";")
            .get();
      Row row;
      List<Transaction> l = new ArrayList<>();
      while ((row = sqr.poll()) != null) {
         l.add(mapRowToTransaction(row));
      }
      return l;
   }

   private Transaction mapRowToTransaction(Row row) {
      Transaction t = new Transaction();
      t.setId(row.getLong("ID"));
      t.setSourceAccountId(row.getLong("SOURCEACCOUNTID"));
      t.setTargetAccountId(row.getLong("TARGETACCOUNTID"));
      t.setAmount(row.getInteger("AMOUNT"));
      return t;
   }

}

Sending events to the topic with Spring Cloud Stream

Finally, we can proceed to the last part of our exercise. We need to generate test data and send it to the Kafka transactions topic. The simplest way to achieve it is with the Spring Cloud Stream Kafka module. Firstly, let’s add the following Maven dependency:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

Then, we may create a producer based on the Spring Supplier bean. The Supplier bean continuously generates and sends new events to the target channel. By default, it repeats the action once per second.

@Configuration
public class KafkaEventProducer {

   private static long transactionId = 0;
   private static final Random r = new Random();

   @Bean
   public Supplier<Message<Transaction>> transactionsSupplier() {
      return () -> {
          Transaction t = new Transaction();
          t.setId(++transactionId);
          t.setSourceAccountId(r.nextLong(1, 100));
          t.setTargetAccountId(r.nextLong(1, 100));
          t.setAmount(r.nextInt(1, 10000));
          Message<Transaction> o = MessageBuilder
                .withPayload(t)
                .setHeader(KafkaHeaders.MESSAGE_KEY, new TransactionKey(t.getId()))
                .build();
          return o;
      };
   }
}

Of course, we also need to provide the address of our Kafka cluster and the name of a target topic for the channel. The address of Kafka is injected at the deployment phase.

spring.kafka.bootstrap-servers = ${KAFKA_URL}
spring.cloud.stream.bindings.transactionsSupplier-out-0.destination = transactions

Finally, let’s deploy our Spring Boot on Kubernetes. Here’s the YAML manifest containing Kubernetes Deployment and Service definitions:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: transactions
spec:
  selector:
    matchLabels:
      app: transactions
  template:
    metadata:
      labels:
        app: transactions
    spec:
      containers:
      - name: transactions
        image: piomin/transactions-service
        env:
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap:9092
        ports:
        - containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
  name: transactions
spec:
  type: ClusterIP
  selector:
    app: transactions
  ports:
    - port: 8080

Let’s deploy the app in the kafka namespace:

$ kubectl apply -f k8s/deployment.yaml -n kafka

Testing ksqlDB on Kubernetes

Once the app is deployed on Kubernetes, let’s enable port-forward to test it on the local port:

$ kubectl port-forward service/transactions 8080:8080

Now, we can test our two HTTP endpoints. Let’s start with the endpoint for searching all transactions:

$ curl http://localhost:8080/transactions

Then, you can call the endpoint for searching all transactions related to the targetAccountId, e.g.:

$ curl http://localhost:8080/transactions/target/10

Final Thoughts

In this article, I wanted to show how you can start with ksqlDB on Kubernetes. We used such frameworks as Spring Boot and Spring Cloud Stream to interact with Kafka and ksqlDB. You could see how to run the Kafka cluster on Kubernetes using the Strimzi operator or how to deploy KSQL Server directly from the Helm repository.

The post Introduction to ksqlDB on Kubernetes with Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/06/22/introduction-to-ksqldb-on-kubernetes-with-spring-boot/feed/ 12 11924
Local Development with Redpanda, Quarkus and Testcontainers https://piotrminkowski.com/2022/04/20/local-development-with-redpanda-quarkus-and-testcontainers/ https://piotrminkowski.com/2022/04/20/local-development-with-redpanda-quarkus-and-testcontainers/#comments Wed, 20 Apr 2022 08:13:36 +0000 https://piotrminkowski.com/?p=11098 In this article, you will learn how to speed up your local development with Redpanda and Quarkus. The main goal is to show that you can replace Apache KafkaⓇ with Redpanda without any changes in the source code. Instead, you will get a fast way to run your existing Kafka applications without Zookeeper and JVM. […]

The post Local Development with Redpanda, Quarkus and Testcontainers appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to speed up your local development with Redpanda and Quarkus. The main goal is to show that you can replace Apache Kafka with Redpanda without any changes in the source code. Instead, you will get a fast way to run your existing Kafka applications without Zookeeper and JVM. You will also see how Quarkus uses Redpanda as a local instance for development. Finally, we are going to run all containers in the Testcontainers Cloud.

For the current exercise, we use the same examples as described in one of my previous articles about Quarkus and Kafka Streams. Just to remind you: we are building a simplified version of the stock market platform. The stock-service application receives and handles incoming orders. There are two types of orders: purchase (BUY) and sale (SELL). While the stock-service consumes Kafka streams, the order-service generates and sends events to the orders.buy and orders.sell topics. Here’s the diagram with our architecture. As you see, the stock-service also uses PostgreSQL as a database.

quarkus-redpanda-arch

Source Code

If you would like to try this exercise yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. Then switch to the dev branch. After that, you should just follow my instructions. Let’s begin.

Install Redpanda

This step is not required. However, it is worth installing Redpanda since it provides a useful CLI called Redpanda Keeper (rpk) to manage a cluster. To install Redpanda on macOS just run the following command:

$ brew install redpanda-data/tap/redpanda

Now, we can easily create and run a new cluster. For the development purpose, we only need a single-node Redpanda cluster. In order to run, you need to have Docker on your laptop.

$ rpk container start

Before proceeding to the next steps let’s just remove a current cluster. Quarkus will create everything for us automatically.

$ rpk container purge

Quarkus with Kafka and Postgres

Let’s begin with the stock-service. It consumes streams from Kafka topics and connects to the PostgreSQL database, as I mentioned before. So, the first step is to include the following dependencies:

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-kafka-streams</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-jdbc-postgresql</artifactId>
</dependency>

Now, we may proceed to the implementation. The topology for all the streams is provided inside the following method:

@Produces
public Topology buildTopology() {
   ...
}

There are some different streams defined there. But let’s just take a look at the fragment of topology responsible for creating transactions from incoming orders

final String ORDERS_BUY_TOPIC = "orders.buy";
final String ORDERS_SELL_TOPIC = "orders.sell";
final String TRANSACTIONS_TOPIC = "transactions";

// ... other streams

KStream<Long, Order> orders = builder.stream(
   ORDERS_SELL_TOPIC,
   Consumed.with(Serdes.Long(), orderSerde));

builder.stream(ORDERS_BUY_TOPIC, Consumed.with(Serdes.Long(), orderSerde))
   .merge(orders)
   .peek((k, v) -> {
      log.infof("New: %s", v);
      logic.add(v);
   });

builder.stream(ORDERS_BUY_TOPIC, Consumed.with(Serdes.Long(), orderSerde))
   .selectKey((k, v) -> v.getProductId())
   .join(orders.selectKey((k, v) -> v.getProductId()),
      this::execute,
      JoinWindows.of(Duration.ofSeconds(10)),
      StreamJoined.with(Serdes.Integer(), orderSerde, orderSerde))
   .filterNot((k, v) -> v == null)
   .map((k, v) -> new KeyValue<>(v.getId(), v))
   .peek((k, v) -> log.infof("Done -> %s", v))
   .to(TRANSACTIONS_TOPIC, Produced.with(Serdes.Long(), transactionSerde));

The whole implementation is more advanced. For the details, you may refer to the article I mentioned in the introduction. Now, let’s imagine we are still developing our stock market app. Firstly, we should run PostgreSQL and a local Kafka cluster. We use Redpanda, which is easy to run locally. After that, we would typically provide addresses of both the database and broker in the application.properties. But using a feature called Quarkus Dev Services, the only thing we need to configure now, are the names of topics used for consuming Kafka Streams and the application id. Both of these are required by Kafka Streams.

Now, the most important thing: you just need to start the Quarkus app. Nothing more. DO NOT run any external tools by yourself and DO NOT provide any addresses for them in the configuration settings. Just add the two lines you see below:

quarkus.kafka-streams.application-id = stock
quarkus.kafka-streams.topics = orders.buy,orders.sell

Run Quarkus in dev mode with Redpanda

Before you run the Quarkus app, make sure you have Docker running on your laptop. When you do, the only thing you need is to start both test apps. Let’s begin with the stock-service since it receives orders generated by the order-service. Go to the stock-service directory and run the following command:

$ cd stock-service
$ mvn quarkus:dev

If you see the following logs, it means that everything went well. Our application has been started in 13 seconds. During this time, Quarkus also started Kafka, PostgreSQL on Docker, and built Kafka Streams. Everything in 13 seconds with a single command and without any additional configuration. Nice, right? Let’s check out what happened in the background:

Firstly, let’s find the following line of logs beginning with the sentence “Dev Services for Kafka started”. It perfectly describes the feature of Quarkus called Dev Services. Our Kafka instance has been started as a Docker container and is available under a dynamically generated port. The application connects to it. All other Quarkus apps you would run now will share the same instance of a broker. You can disable that feature by setting the property quarkus.kafka.devservices.shared to false.

It may be a little surprising, but Quarkus Dev Services for Kafka uses Redpanda to run a broker. Of course, Redpanda is a Kafka-compatible solution. Since it starts in ~one second and does not require Zookeeper, it is a great choice for local development.

In order to run tools like brokers or databases on Docker, Quarkus uses Testcontainers. If you are interested in more details about Quarkus Dev Services for Kafka, read the following documentation. For now, let’s display a list of running containers using the docker ps command. There is a container with Redpanda, PostgreSQL, and Testcontainers.

quarkus-redpanda-containers

Manage Kafka Streams with Redpanda and Quarkus

Let’s verify how everything works on the application side. After running the application, we can take advantage of another useful Quarkus feature called Dev UI. Our UI console is available under the address http://localhost:8080/q/dev/. After accessing it, you can display a topology of Kafka Streams by clicking the button inside the Apache Kafka Streams tile.

Here you will see a summary of available streams. For me, it is 12 topics and 15 state stores. You may also see a visualization of Kafka Streams’ topology. The following picture shows the fragment of topology. You can download the full image by clicking the green download button, visible on the right side of the screen.

quarkus-redpanda-dev

Now, let’s use the Redpanda CLI to display a list of created topics. In my case, Redpanda is available under the port 55001 locally. All the topics are automatically created by Quarkus during application startup. We need to define the names of topics used in communication between both our test apps. Those topics are: orders.buy, orders.sell and transactions. They are configured and created by the order-service. The stock-service is creating all other topics visible below, which are responsible for handling streams.

$ rpk topic list --brokers localhost:55001
NAME                                                    PARTITIONS  REPLICAS
orders.buy                                              1           1
orders.sell                                             1           1
stock-KSTREAM-JOINOTHER-0000000016-store-changelog      1           1
stock-KSTREAM-JOINOTHER-0000000043-store-changelog      1           1
stock-KSTREAM-JOINOTHER-0000000065-store-changelog      1           1
stock-KSTREAM-JOINTHIS-0000000015-store-changelog       1           1
stock-KSTREAM-JOINTHIS-0000000042-store-changelog       1           1
stock-KSTREAM-JOINTHIS-0000000064-store-changelog       1           1
stock-KSTREAM-KEY-SELECT-0000000005-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000006-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000032-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000033-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000054-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000055-repartition         1           1
stock-transactions-all-summary-changelog                1           1
stock-transactions-all-summary-repartition              1           1
stock-transactions-per-product-summary-30s-changelog    1           1
stock-transactions-per-product-summary-30s-repartition  1           1
stock-transactions-per-product-summary-changelog        1           1
stock-transactions-per-product-summary-repartition      1           1
transactions                                            1           1

In order to do a full test, we also need to run order-service. It is generating orders continuously and sending them to the orders.buy or orders.sell topics. Let’s do that.

Send messages to Redpanda with Quarkus

Before we run order-service, let’s see some implementation details. On the producer side, we need to include a single dependency responsible for integration with a Kafka broker:

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

Our application generates and sends random orders to the orders.buy or orders.sell topics. There are two methods for that, each of them dedicated to a single topic. Let’s just see a method for generating BUY orders. We need to annotate it with @Outgoing and set the channel name (orders-buy). Our method generates a single order per 500 milliseconds.

@Outgoing("orders-buy")
public Multi<Record<Long, Order>> buyOrdersGenerator() {
   return Multi.createFrom().ticks().every(Duration.ofMillis(500))
      .map(order -> {
         Integer productId = random.nextInt(10) + 1;
         int price = prices.get(productId) + random.nextInt(200);
         Order o = new Order(
            incrementOrderId(),
            random.nextInt(1000) + 1,
            productId,
            100 * (random.nextInt(5) + 1),
            LocalDateTime.now(),
            OrderType.BUY,
            price);
         log.infof("Sent: %s", o);
      return Record.of(o.getId(), o);
   });
}

After that, we need to map the channel name into a target topic name. Another required operation is to set the serializer for the message key and value.

mp.messaging.outgoing.orders-buy.connector = smallrye-kafka
mp.messaging.outgoing.orders-buy.topic = orders.buy
mp.messaging.outgoing.orders-buy.key.serializer = org.apache.kafka.common.serialization.LongSerializer
mp.messaging.outgoing.orders-buy.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Finally, go to the order-service directory and run the application.

$ cd order-service
$ mvn quarkus:dev

Once you start order-service, it will create topics and start sending orders. It uses the same instance of Redpanda as stock-service. You can run the docker ps command once again to verify it.

Now, just do a simple change in stock-service to reload the application. It will also reload the Kafka Streams topology. After that, it is starting to receive orders from the topics created by the order-service. Finally, it will create transactions from incoming orders and store them in the transactions topic.

Use Testcontainers Cloud

In our development process, we need to have a locally installed Docker ecosystem. But, what if we don’t have it? That’s where Testcontainers Cloud comes in. Testcontainers Cloud is the developer-first SaaS platform for modern integration testing with real databases, message brokers, cloud services, or any other component of application infrastructure. To simplify, we will do the same thing as before but our instances of Redpanda and PostgreSQL will not run on the local Docker, but on the remote Testcointainers platform.

What do you need to do to enable Testcontainers Cloud? Firstly, download the agent from the following site. You also need to be a beta tester to obtain an authorization token. Finally, just run the agent and kill your local Docker daemon. You should see the Testcontainers icon in the running apps with information about the connection to the cloud.

quarkus-redpanda-testcontainers

Docker should not run locally.

The same as before, just run both applications with the quarkus:dev command. Your Redpanda broker is running on the Testcontainers Cloud but, thanks to the agent, you may access it over localhost.

Once again you can verify a list of topics using the following command for the new broker:

$ rpk topic list --brokers localhost:59779

Final Thoughts

In this article, I focused on showing you how new and exciting technologies like Quarkus, Redpanda, and Testcontainers can work together. Local development is one of the use cases, but you may as well use them to write integration tests.

The post Local Development with Redpanda, Quarkus and Testcontainers appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/04/20/local-development-with-redpanda-quarkus-and-testcontainers/feed/ 2 11098
Deep Dive into Saga Transactions with Kafka Streams and Spring Boot https://piotrminkowski.com/2022/02/07/deep-dive-into-saga-transactions-with-kafka-streams-and-spring-boot/ https://piotrminkowski.com/2022/02/07/deep-dive-into-saga-transactions-with-kafka-streams-and-spring-boot/#comments Mon, 07 Feb 2022 14:38:20 +0000 https://piotrminkowski.com/?p=10587 In this article, you will learn how to use Kafka Streams and Spring Boot to perform transactions according to the Saga pattern. To be honest, I was quite surprised by a great deal of attention to my last article about Kafka. I got some questions about streams, transactions, and support for Kafka in Spring Boot. […]

The post Deep Dive into Saga Transactions with Kafka Streams and Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams and Spring Boot to perform transactions according to the Saga pattern. To be honest, I was quite surprised by a great deal of attention to my last article about Kafka. I got some questions about streams, transactions, and support for Kafka in Spring Boot. In this article, I’ll try to answer a few of them. I will also show how you can easily set up a cloud-managed Kafka on the Upstash.

Introduction

First of all, let’s recap the approach described in the previous article. We used Kafka Streams to process order transactions on the order-service side. To handle orders coming to the stock-service and payment-service we used a standard Spring @KafkaListener. There are also two databases – a single database per every service. The stock-service stores data related to the number of available products and updates them after receiving an order. The same with the payment-service. It updates the customer’s account on every single order. Both applications receive orders from Kafka topic. They send responses to other topics. But just to simplify, we will skip it as shown in the figure below. We treat the Kafka orders topic as a stream of events and also as a table with the latest order’s status.

kafka-streams-transactions-old-arch

What may go wrong with that approach? In fact, we have two data sources here. We use Kafka as the order store. On the other hand, there are SQL databases (in my case H2, but you can use any other) that store stock and payment data. Once we send an order with a reservation to the Kafka topic, we need to update a database. Since Kafka does not support XA transactions, it may result in data inconsistency. Of course, Kafka doesn’t support XA transactions the same as many other systems including e.g. RabbitMQ.

The question is what can we do with that? One of the possible options you may use is an approach called Change Data Capture (CDC) with the outbox pattern. CDC identifies and tracks changes to data in a database. Then it may emit those changes as events and send them, for example to the Kafka topic. I won’t go into the details of that process. If you are interested in you may read this article written by Gunnar Morling.

Architecture with Kafka Streams

The approach I will describe today is fully based on the Kafka Streams. We won’t use any SQL databases. When the order-service sends a new order its id is the message key. With Kafka Streams, we may change a message key in the stream. It results in creating new topics and repartitioning. With new message keys, we may perform calculations just for the specific customerId or productId. The result of such calculation may be saved in the persistent store. For example, Kafka automatically creates and manages such state stores when you are calling stateful operations like count() or aggregate(). We will aggregate the orders related to the particular customer or product. Here’s the illustration of our architecture. Here’s the visualization of our process.

kafka-streams-transactions-arch

Now, let’s consider a scenario for the payment-service in details. In the incoming stream of orders the payment-service calls the selectKey() operation. It changes the key from the order’s id into the order’s customerId. Then it groups all the orders by the new key and invokes the aggregate() operation. In the aggregate() method it calculates the available amount and reserved amount based on the order’s price and status (whether it is a new order or a confirmation order). If there are sufficient funds on the customer account it sends the ACCEPT order to the payment-orders topic. Otherwise, it sends the REJECT order. Then the order-service process responses by joining streams from payment-orders and stock-orders by the order’s id. As the result, it sends a confirmation or a rollback order.

kafka-streams-transactions-details

Finally, let’s proceed to the implementation!

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then switch to the streams-full branch. After that, you should just follow my instructions.

Aggregation with Kafka Streams

Let’s begin with the payment-service. The implementation of KStream in not complicated here. In the first step (1), we invoke the selectKey() method and get the customerId value of the Order object as a new key. Then we call groupByKey() method (2) to receive KGroupedStream as a result. While we have KGroupedStream we may invoke one of the calculation methods. In that case, we need to use aggregate(), since we have a little bit more advanced calculation than just a simple count (3). The last two steps are just for printing the value after calculation.

@Bean
public KStream<Long, Order> stream(StreamsBuilder builder) {
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   JsonSerde<Reservation> rsvSerde = new JsonSerde<>(Reservation.class);
   KStream<Long, Order> stream = builder
      .stream("orders", Consumed.with(Serdes.Long(), orderSerde))
      .peek((k, order) -> LOG.info("New: {}", order));

   KeyValueBytesStoreSupplier customerOrderStoreSupplier =
      Stores.persistentKeyValueStore("customer-orders");

   stream.selectKey((k, v) -> v.getCustomerId()) // (1)
      .groupByKey(Grouped.with(Serdes.Long(), orderSerde)) // (2)
      .aggregate(
         () -> new Reservation(random.nextInt(1000)),
         aggregatorService,
         Materialized.<Long, Reservation>as(customerOrderStoreSupplier)
            .withKeySerde(Serdes.Long())
            .withValueSerde(rsvSerde)) // (3)
      .toStream()
      .peek((k, trx) -> LOG.info("Commit: {}", trx));

   return stream;
}

However, the most important step in the fragment of code visible above is the class called inside the aggregate() method. The aggregate() method takes three input arguments. The first of them indicates the starting value of our compute object. That object represents the current state of the customer’s account. It has two fields: amountAvailable and amountReserved. To clarify, we use that object instead of the entity that stores available and reserved amounts on the customer account. Each customer is represented by the customerId (key) and the Reservation object (value) in Kafka KTable. Just for the test purpose, we are generating the starting value of amountAvailable as a random number between 0 and 1000.

public class Reservation {
   private int amountAvailable;
   private int amountReserved;

   public Reservation() {
   
   }

   public Reservation(int amountAvailable) {
      this.amountAvailable = amountAvailable;
   }

   // GETTERS AND SETTERS ...

}

Ok, let’s take a look at our aggregation method. It needs to implement the Kafka Aggregate interface and its method apply(). It may handle three types of orders. One of them is a confirmation of the order (1). It confirms the distributed transaction, so we just need to cancel a reservation by subtracting the order’s price from the amountReserved field. On the other, in the case of rollback, we need to increase the value of amountAvailable by the order’s price and decrease the value amountRerserved accordingly (2). Finally, if we receive a new order we need to perform a reservation if there are sufficient funds on the customer account, or otherwise, reject an order.

Aggregator<Long, Order, Reservation> aggregatorService = (id, order, rsv) -> {
   switch (order.getStatus()) {
      case "CONFIRMED" -> // (1)
         rsv.setAmountReserved(rsv.getAmountReserved() 
               - order.getPrice());
      case "ROLLBACK" -> { // (2)
         if (!order.getSource().equals("PAYMENT")) {
            rsv.setAmountAvailable(rsv.getAmountAvailable() 
                  + order.getPrice());
            rsv.setAmountReserved(rsv.getAmountReserved() 
                  - order.getPrice());
         }
      }
      case "NEW" -> { // (3)
         if (order.getPrice() <= rsv.getAmountAvailable()) {
            rsv.setAmountAvailable(rsv.getAmountAvailable() 
                  - order.getPrice());
            rsv.setAmountReserved(rsv.getAmountReserved() 
                  + order.getPrice());
            order.setStatus("ACCEPT");
         } else {
            order.setStatus("REJECT");
         }
         template.send("payment-orders", order.getId(), order);
      }
   }
   LOG.info("{}", rsv);
   return rsv;
};

State Store with the Kafka Streams Table

The implementation of the stock-service is pretty similar to the payment-service. With the difference that we count a number of available products on stock instead of available funds on the customer account. Here’s our Reservation object:

public class Reservation {
   private int itemsAvailable;
   private int itemsReserved;

   public Reservation() {
    
   }

   public Reservation(int itemsAvailable) {
      this.itemsAvailable = itemsAvailable;
   }

   // GETTERS AND SETTERS ...

}

The implementation of the aggregation method is also very similar to the payment-service. However, this time, let’s focus on another thing. Once we process a new order we need to send a response to the stock-orders topic. We use KafkaTemplate for that. In the case of payment-service we also send a response, but to the payment-orders topic. The send method from the KafkaTemplate does not block the thread. It returns the ListenableFuture objects. We may add a callback to the send method using it and the result after sending the message (1). Finally, let’s log the current state of the Reservation object (2).

Aggregator<Long, Order, Reservation> aggrSrv = (id, order, rsv) -> {
   switch (order.getStatus()) {
      case "CONFIRMED" -> rsv.setItemsReserved(rsv.getItemsReserved() 
            - order.getProductCount());
      case "ROLLBACK" -> {
         if (!order.getSource().equals("STOCK")) {
            rsv.setItemsAvailable(rsv.getItemsAvailable() 
                  + order.getProductCount());
            rsv.setItemsReserved(rsv.getItemsReserved() 
                  - order.getProductCount());
         }
      }
      case "NEW" -> {
         if (order.getProductCount() <= rsv.getItemsAvailable()) {
            rsv.setItemsAvailable(rsv.getItemsAvailable() 
                  - order.getProductCount());
            rsv.setItemsReserved(rsv.getItemsReserved() 
                  + order.getProductCount());
            order.setStatus("ACCEPT");
         } else {
            order.setStatus("REJECT");
         }
         // (1)
         template.send("stock-orders", order.getId(), order)
            .addCallback(r -> LOG.info("Sent: {}", 
               result != null ? result.getProducerRecord().value() : null),
               ex -> {});
      }
   }
   LOG.info("{}", rsv); // (2)
   return rsv;
};

After that, we are also logging the value of the Reservation object (1). In order to do that we need to convert KTable into KStream and then call the peek method. This log is printed just after Kafka Streams commits the offset in the source topic.

@Bean
public KStream<Long, Order> stream(StreamsBuilder builder) {
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   JsonSerde<Reservation> rsvSerde = new JsonSerde<>(Reservation.class);
   KStream<Long, Order> stream = builder
      .stream("orders", Consumed.with(Serdes.Long(), orderSerde))
      .peek((k, order) -> LOG.info("New: {}", order));

   KeyValueBytesStoreSupplier stockOrderStoreSupplier =
      Stores.persistentKeyValueStore("stock-orders");

   stream.selectKey((k, v) -> v.getProductId())
      .groupByKey(Grouped.with(Serdes.Long(), orderSerde))
      .aggregate(() -> new Reservation(random.nextInt(100)), aggrSrv,
         Materialized.<Long, Reservation>as(stockOrderStoreSupplier)
            .withKeySerde(Serdes.Long())
            .withValueSerde(rsvSerde))
      .toStream()
      .peek((k, trx) -> LOG.info("Commit: {}", trx)); // (1)

   return stream;
}

What will happen if you send the test order? Let’s see the logs. You can see the difference in time between processing the message and offset commit. You won’t have any problems with that until your application is running or it has been stopped gracefully. But if you, for example, kill the process using the kill -9 command? After restart, our application will receive the same messages once again. Since we use KafkaTemplate to send the response to the stock-orders topic, we need to commit the offset as soon as possible.

What can we do to avoid such problems? We may override the default value (30000) of the commit.interval.ms Kafka Streams property. If you set it to 0, it commits immediately after processing finishes.

spring.kafka:  
  streams:
    properties:
      commit.interval.ms: 0

On the other hand, we can also set the property processing.guarantee to exactly_once. It also changes the default value of commit.interval.ms to 100ms and enables idempotence for a producer. You can read more about it here in Kafka documentation.

spring.kafka:  
  streams:
    properties:
      processing.guarantee: exactly_once

Running Kafka on Upstash

For the purpose of today’s exercise, we will use a serverless Kafka cluster on Upstash. You can create it with a single click. If you would like to test JAAS authentication for your application I’ve got good news 🙂 The authentication on that cluster is enabled by default. You can find and copy username and password from the cluster’s main panel.

kafka-streams-transactions-upstash

Now, let’s configure Kafka connection settings and credentials for the Spring Boot application. There is a developer free tier on Upstash up to 10k messages per day. It will be enough for our tests.

spring.kafka:
  bootstrap-servers: topical-gar-11460-us1-kafka.upstash.io:9092
  properties:
    security.protocol: SASL_SSL
    sasl.mechanism: SCRAM-SHA-256
    sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${USERNAME}" password="${PASSWORD}";

With Upstash you can easily display a list of topics. In total, there are 10 topics used in our sample system. Three of them are used directly by the Spring Boot applications, while the rest of them by the Kafka Streams in order to process stateful operations.

After starting the order-service application we can call its REST endpoint to create and send an order to the Kafka topic.

private static final Logger LOG = 
   LoggerFactory.getLogger(OrderController.class);
private AtomicLong id = new AtomicLong();
private KafkaTemplate<Long, Order> template;

@PostMapping
public Order create(@RequestBody Order order) {
   order.setId(id.incrementAndGet());
   template.send("orders", order.getId(), order);
   LOG.info("Sent: {}", order);
   return order;
}

Let’s call the endpoint using the following curl command. You can use any customerId or productId you want.

$ curl -X 'POST' \
  'http://localhost:8080/orders' \
  -H 'Content-Type: application/json' \
  -d '{
    "customerId": 20,
    "productId": 20,
    "productCount": 2,
    "price": 10,
    "status": "NEW"
  }'

All three sample applications use Kafka Streams to process distributed transactions. Once the order is accepted by both stock-service and payment-service you should see the following entry in the order-service logs.

You can easily simulate rejection of transactions with Kafka Streams just by setting e.g. productCount higher than the value generated by the product-service as available items.

With Upstash UI you can also easily verify the number of messages incoming to the topics. Let’s see the current statistics for the orders topic.

Final Thoughts

In order to fully understand what happens in this example, you should be also familiar with the Kafka Streams threading model. It is worth reading the following article, which explains it in a clean manner. First of all, each stream partition is a totally ordered sequence of data records and maps to a Kafka topic partition. It means, that even if we have multiple orders at the same time related to e.g. same product, they are all processed sequentially since they have the same message key (productId in that case).

Moreover, by default, there is only a single stream thread that handles all the partitions. You can see this in the logs below. However, there are stream tasks that act as the lowest-level units of parallelism. As a result, stream tasks can be processed independently and in parallel without manual intervention.

I hope this article helps you to better understand Kafka Streams. I just wanted to give you a simple example of how you can use Kafka Streams with Saga transactions in order to simplify your current architecture.

The post Deep Dive into Saga Transactions with Kafka Streams and Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/02/07/deep-dive-into-saga-transactions-with-kafka-streams-and-spring-boot/feed/ 8 10587