redpanda Archives - Piotr's TechBlog https://piotrminkowski.com/tag/redpanda/ Java, Spring, Kotlin, microservices, Kubernetes, containers Sun, 30 Apr 2023 11:28:48 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 redpanda Archives - Piotr's TechBlog https://piotrminkowski.com/tag/redpanda/ 32 32 181738725 Concurrency with Kafka and Spring Boot https://piotrminkowski.com/2023/04/30/concurrency-with-kafka-and-spring-boot/ https://piotrminkowski.com/2023/04/30/concurrency-with-kafka-and-spring-boot/#comments Sun, 30 Apr 2023 11:28:45 +0000 https://piotrminkowski.com/?p=14121 This article will teach you how to configure concurrency for Kafka consumers with Spring Boot and Spring for Kafka. Concurrency in Spring for Kafka is closely related to the Kafka partitions and consumer groups. Each consumer within a consumer group can receive messages from multiple partitions. While a consumer inside a group uses a single […]

The post Concurrency with Kafka and Spring Boot appeared first on Piotr's TechBlog.

]]>
This article will teach you how to configure concurrency for Kafka consumers with Spring Boot and Spring for Kafka. Concurrency in Spring for Kafka is closely related to the Kafka partitions and consumer groups. Each consumer within a consumer group can receive messages from multiple partitions. While a consumer inside a group uses a single thread, the group of consumers utilizes multiple threads to consume messages. Although each consumer is single-threaded, the processing of records can leverage multiple threads. We will analyze how to achieve it with Spring Boot and Spring for Kafka.

The topic described today, concurrency with Kafka and Spring Boot, rather deals with the basic issues. If you are looking for something more advanced in this area, you can read some of my other articles. In that article, you can find information about Kafka Streams and the Spring Cloud Stream project. You can also read more about Kafka transactions with Spring Boot in the following article.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should go to the no-transactions-service directory. After that, you should just follow my instructions. Let’s begin.

Prerequisites

We will use three different tools in the exercise today. Of course, we will create the Spring Boot consumer app using the latest version of Spring Boot 3 and Java 19. In order to run Kafka locally, we will use Redpanda – a platform compatible with Kafka API. You can easily start and manage Redpanda with their CLI tool – rpk. If you want to install rpk on your laptop please follow the installation instructions available here.

Finally, we need a tool for load testing. I’m using the k6 tool and its extension for integration with Kafka. Of course, it is just a proposition, you can use any other solution you like. With k6 I’m able to generate and send a lot of messages to Kafka quickly. In order to use k6 you need to install it on your laptop. Here are the installation instructions. After that, you need to install the xk6-kafka extension. In the following documentation, you have a full list of the k6 extensions.

Introduction

For the purpose of this exercise, we will create a simple Spring Boot application that connects to Kafka and receives messages from a single topic. From the business logic perspective, it handles transactions between the accounts and stores inside an in-memory database. Here’s a list of dependencies we need to include in the Maven pom.xml:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>

Then, let’s see the configuration settings. Our app connects to the Kafka broker using the address set in the KAFKA_URL environment variable. It expects messages in JSON format. Therefore we need to set JsonDeserializer as a value deserializer. The incoming message is serialized to the pl.piomin.services.common.model.Order object. To make it work, we need to set the spring.json.value.default.type and spring.json.trusted.packages properties. The k6 tool won’t set a header with information containing the JSON target type, so we need to disable that feature on Spring for Kafka with the spring.json.use.type.headers property.

spring:
  application.name: no-transactions-service
  kafka:
    bootstrap-servers: ${KAFKA_URL}
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        "[spring.json.value.default.type]": "pl.piomin.services.common.model.Order"
        "[spring.json.trusted.packages]": "pl.piomin.services.common.model"
        "[spring.json.use.type.headers]": false
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

Here’s the class representing incoming messages.

public class Order {

   private Long id;
   private Long sourceAccountId;
   private Long targetAccountId;
   private int amount;
   private String status;

   // GETTERS AND SETTERS...
}

The last thing we need to do is to enable Spring for Kafka and generate some test accounts for making transactions.

@SpringBootApplication
@EnableKafka
public class NoTransactionsService {

   public static void main(String[] args) {
      SpringApplication.run(NoTransactionsService.class, args);
   }

   private static final Logger LOG = LoggerFactory
      .getLogger(NoTransactionsService.class);

   Random r = new Random();

   @Autowired
   AccountRepository repository;

   @PostConstruct
   public void init() {
      for (int i = 0; i < 1000; i++) {
         repository.save(new Account(r.nextInt(1000, 10000)));
      }
   }

}

Running Kafka using Redpanda

Once we successfully installed the rpk CLI we can easily run a single-node Kafka broker by executing the following command:

$ rpk container start

Here’s the result. As you see the address of my broker is localhost:51961. The port number is generated automatically, so yours will probably be different. To simplify the next actions, let’s just set it as the REDPANDA_BROKERS environment variable.

Once we created a broker we can create a topic. We will use the topic with the transactions name in our tests. In the first step, we make tests with a single partition.

$ rpk topic create transactions -p 1

Prepare Load Tests for Kafka

Our load test will generate and send orders in JSON format with random values. The k6 tool allows us to write tests in JavaScript. We need to use the k6 Kafka extension library. The address of the Kafka broker is retrieved from the KAFKA_URL environment variable. We are incrementing the order’s id field each time we generate a new message.

import {
  Writer,
  SchemaRegistry,
  SCHEMA_TYPE_JSON,
} from "k6/x/kafka";
import { randomIntBetween } from 'https://jslib.k6.io/k6-utils/1.2.0/index.js';

const writer = new Writer({
  brokers: [`${__ENV.KAFKA_URL}`],
  topic: "transactions",
});

const schemaRegistry = new SchemaRegistry();

export function setup() {
  return { index: 1 };
}

export default function (data) {
  writer.produce({
    messages: [
      {
        value: schemaRegistry.serialize({
          data: {
            id: data.index++,
            sourceAccountId: randomIntBetween(1, 1000),
            targetAccountId: randomIntBetween(1, 1000),
            amount: randomIntBetween(10, 50),
            status: "NEW"
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
      },
    ],
  });
}

export function teardown(data) {
  writer.close();
}

Before running the test we need to set the KAFKA_URL environment variable. Then we can use the k6 run command to generate and send a lot of messages.

$ k6 run load-test.js -u 1 -d 30s 

Scenario 1: Single-partition Topic Listener

Let’s start with the defaults. Our topic has just a single partition. We are creating the @KafkaListener just with the topic and consumer group names. Once the listener receives an incoming message it invokes the AccountService bean to process the order.

@Inject
AccountService service;
    
@KafkaListener(
   id = "transactions",
   topics = "transactions",
   groupId = "a")
public void listen(Order order) {
   LOG.info("Received: {}", order);
   service.process(order);
}

Our Spring Boot Kafka app is prepared for concurrency. We will lock the Account entity during the transaction with the PESSIMISTIC_WRITE mode.

public interface AccountRepository extends CrudRepository<Account, Long> {

    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Optional<Account> findById(Long id);
}

Here’s the implementation of our AccountService bean for handling incoming orders. The process(...) method is @Transactional. In the first step, we find the source (1) and target (2) Account entity. Then we perform a transfer between the account if there are sufficient funds on the source account (3). I’m also simulating a delay just for test purposes (4). Finally, we can send a response asynchronously to another topic using the KafkaTemplate bean (5).

@Service
public class AccountService {

    private static final Logger LOG = LoggerFactory
            .getLogger(AccountService.class);
    private final Random RAND = new Random();

    KafkaTemplate<Long, Order> kafkaTemplate;
    AccountRepository repository;

    public AccountService(KafkaTemplate<Long, Order> kafkaTemplate, 
                          AccountRepository repository) {
        this.kafkaTemplate = kafkaTemplate;
        this.repository = repository;
    }

    @Transactional
    public void process(Order order) {
        Account accountSource = repository
                .findById(order.getSourceAccountId())
                .orElseThrow(); // (1)

        Account accountTarget = repository
                .findById(order.getTargetAccountId())
                .orElseThrow(); // (2)

        if (accountSource.getBalance() >= order.getAmount()) { // (3)
            accountSource.setBalance(accountSource.getBalance() - order.getAmount());
            repository.save(accountSource);
            accountTarget.setBalance(accountTarget.getBalance() + order.getAmount());
            repository.save(accountTarget);
            order.setStatus("PROCESSED");
        } else {
            order.setStatus("FAILED");
        }

        try {
            Thread.sleep(RAND.nextLong(1, 20)); // (4)
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        LOG.info("Processed: order->{}", new OrderDTO(order, accountSource, accountTarget));

        // (5)
        CompletableFuture<SendResult<Long, Order>> result = kafkaTemplate
           .send("orders", order.getId(), order);
        result.whenComplete((sr, ex) ->
                LOG.debug("Sent(key={},partition={}): {}",
                        sr.getProducerRecord().partition(),
                        sr.getProducerRecord().key(),
                        sr.getProducerRecord().value()));
    }

}

Let’s set the address of the Kafka broker in the KAFKA_URL environment variable and then start the app.

$ export KAFKA_URL=127.0.0.1:51961
$ mvn spring-boot:run

Let’s analyze what happens. Our listener is connecting to the transactions topic. It establishes just a single connection since there is a single partition.

In that case, we have just a single instance of our app running and a single thread responsible for handling messages. Let’s verify the current lag on the partition for our consumer group. As you see, the messages are processed very slowly. At first glance, you may be quite surprised.

Scenario 2: Multiple-partitions Topic Listener

Let’s analyze the next scenario. Now, the transactions topic consists of 10 partitions. We won’t change anything in the app code and configuration. We will just remove the previously created topic and create a new one with 10 partitions using the following commands:

$ rpk topic delete transactions
$ rpk topic create transaction -p 10

Once again, we are starting the app using the following Maven command:

$ mvn spring-boot:run

Let’s analyze the app logs. As you see, although we have 10 partitions there is still a single thread listening on them.

spring-boot-kafka-concurrency-single-partition

So, our situation hasn’t changed anymore. The app performance is exactly the same. However, now we can run another instance of our Spring Boot app. Once you do it you can take a look at the app logs. A new instance of the app takes 5 partitions.

In that case, a rebalancing occurs. The first instance of our Spring Boot holds 5 other partitions. Now the overall performance is twice as good as before. 

Of course, we can run more app instances. In that case, we can scale up to 10 instances since there are 10 partitions on the topic.

Scenario 3: Consumer Concurrency with Multiple Partitions

Let’s analyze another scenario. Now, we are enabling concurrency at the Kafka listener level. In order to achieve it, we need to set the concurrency field inside the @KafkaListener annotation. This parameter is still related to Kafka partitions. So, there is no sense to set the value higher than the number of partitions. In our case, there are 10 partitions – the same as in the previous scenario. 

@KafkaListener(
   id = "transactions",
   topics = "transactions",
   groupId = "a",
   concurrency = "10")
public void listen(Order order) {
   LOG.info("Received: {}", order);
   service.process(order);
}

After that, we can start the Spring Boot app. Let’s see what happens. As you see, we have 10 concurrent connections – each bound to a single thread.

spring-boot-kafka-concurrency-multi

In that case, the app performance for a single instance is around 10 times better than before. There are 10 concurrent threads, which process incoming messages.

spring-boot-kafka-concurrency-processing

However, if we run our load tests the lag on partitions is still large. Here’s the result after sending ~25k messages in 10 seconds.

spring-boot-kafka-concurrency-lag

Theoretically, we can scale up the number of instances to improve the overall performance. However, that approach won’t change anything. Why? Let’s run another one and take a look at the logs. Now, only 5 threads are still bound to the partitions. Five other threads are in the idle state. The overall performance of the system is not changed. 

Scenario 4: Process in Multiple Threads

Finally the last scenario. We will create a thread pool with the Java ExecutorService. We may still use the custom thread pool with the Kafka consumer concurrency feature as shown below (through the concurrency parameter). Each time the listener receives new messages it processes them in a separate thread.

@Service
public class NoTransactionsListener {

    private static final Logger LOG = LoggerFactory
            .getLogger(NoTransactionsListener.class);

    AccountService service;
    ExecutorService executorService = Executors.newFixedThreadPool(30);

    public NoTransactionsListener(AccountService service) {
        this.service = service;
    }

    @KafkaListener(
            id = "transactions",
            topics = "transactions",
            groupId = "a",
            concurrency = "3")
    public void listen(Order order) {
        LOG.info("Received: {}", order);
        executorService.submit(() -> service.process(order));
    }

}

In that case, one thing should be clarified. With the custom thread pool at the app level, we are losing message ordering within the single partition. The previous model guaranteed ordering, since we have a thread per partition. For our Spring Boot app, it is not important, because we are just processing messages independently.

Let’s start the app. There are 3 concurrent threads that receive messages from the partitions.

There are 30 threads for processing messages and 3 threads for listening to the partitions. Once the message is received in the consumer thread, it is handled by the worker threads.

spring-boot-kafka-concurrency-multi-threads

We can run other instances of our Spring Boot Kafka concurrency apps. I’ll run another two. The first instance grabs 4 partitions, while the next two instances 3.

Now, we can run again load test. It generated and sent ~85k messages to our Kafka broker (around 2.7k per second).

spring-boot-kafka-concurrency-k6

Let’s verify the lag within the consumer group using the rpk group command. The lag on partitions is not large. In fact, there are 90 threads within the three app instances that simultaneously are processing the incoming messages. But wait… does it mean that with 90 threads we are able to process 2.7k orders per second? We should also remember about a custom delay between 1 and 20 ms we added before (with the Thread.sleep method).

The lag looks really fine, although the app is not able to process all requests without a delay. That’s because the default ack mode for commit offset in Spring Kafka is BATCH. If we change it to the RECORD mode, which commits the offset when the listener returns after processing the record, we should get a more precise lag value. In order to override that option, we need to define the ConcurrentKafkaListenerContainerFactory bean as shown below.

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
    kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
   ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
   factory.setConsumerFactory(consumerFactory);
   factory.setConcurrency(3);
   factory.getContainerProperties()
      .setAckMode(ContainerProperties.AckMode.RECORD);
   return factory;
}

Let’s restart the app and make a load test once again. Now, the lag value is much closer to the reality.

Final Thoughts

Concurrency and performance are one of the most important things to consider when working with Kafka and Spring Boot. In this article, I wanted to explain to you some basics with simple examples. I hope it clarifies some concerns over Spring Kafka project usage.

The post Concurrency with Kafka and Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/04/30/concurrency-with-kafka-and-spring-boot/feed/ 15 14121
Running Redpanda on Kubernetes https://piotrminkowski.com/2022/09/06/running-redpanda-on-kubernetes/ https://piotrminkowski.com/2022/09/06/running-redpanda-on-kubernetes/#comments Tue, 06 Sep 2022 15:29:14 +0000 https://piotrminkowski.com/?p=13109 In this article, you will learn how to install and manage Redpanda on Kubernetes. It is not the first article related to Redpanda on my blog. You can read more about Redpanda in my earlier post here. I’m describing there how to do a local development of Java apps with Redpanda, Quarkus, and Testcontainers. You […]

The post Running Redpanda on Kubernetes appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to install and manage Redpanda on Kubernetes. It is not the first article related to Redpanda on my blog. You can read more about Redpanda in my earlier post here. I’m describing there how to do a local development of Java apps with Redpanda, Quarkus, and Testcontainers.

You can use Redpanda in local development as an alternative to the standard Apache Kafka. It is a Kafka API-compatible tool but does not use ZooKeeper or JVM.

In this article, I’m going to show that you can also easily run and use Redpanda on Kubernetes. There are some interesting features that will surely interest you. Let’s begin.

Source Code

If you would like to try this exercise yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. Then switch to the redpanda branch. You will find sample applications for sending and receiving messages to Kafka in the event-driven directory. After that, just follow my instructions.

Install Cert Manager on Kubernetes

The recommended way to install Redpanda on Kubernetes is through the operator. The Redpanda operator requires the cert-manager to create certificates for TLS communication. So in the first step, we need to install cert-manager. We will use Helm for that. Let’s add the following Helm repository:

$ helm repo add jetstack https://charts.jetstack.io && \
  helm repo update

After that, we can install cert-manager in the cert-manager namespace. In order to create a namespace automatically, we should enable the create-namespace option. By default, the Cert Manager does not install CRDs on Kubernetes. Let’s enable it using the installCRDs Helm parameter:

$ helm install cert-manager \
   --namespace cert-manager --create-namespace \
   --set installCRDs=true \
   --version v1.9.1 jetstack/cert-manager

Here’s the list of the cert-manager pods. Assuming you have a similar result, you may proceed to the next section.

$ kubectl get pod -n cert-manager                                                                
NAME                                      READY   STATUS    RESTARTS   AGE
cert-manager-877fd747c-lhrgd              1/1     Running   0          1m
cert-manager-cainjector-bbdb88874-tmlg9   1/1     Running   0          1m
cert-manager-webhook-5774d5d8f7-flv7s     1/1     Running   0          1m

Install Redpanda Operator using Helm

Before we install the Redpanda operator we need to apply a single Cluster object CRD:

$ kubectl apply -k 'https://github.com/redpanda-data/redpanda/src/go/k8s/config/crd?ref=v22.2.2'

The same as before, we will use Helm in the installation process. Firstly, let’s add the official Redpanda Helm repository:

$ helm repo add redpanda https://charts.vectorized.io && \
  helm repo update

We will install the latest version of the Redpanda operator (22.2) in the redpanda-system namespace.

$ helm install redpanda-operator redpanda/redpanda-operator \
    --namespace redpanda-system \
    --create-namespace \
    --set monitoring.enabled=true \
    --version v22.2.2

After that, we may use the Cluster CRD object to create a single-node Redpanda cluster on Kubernetes. Also, let’s enable developer mode. Redpanda provides a built-in HTTP proxy and schema registry. The name of our cluster is one-node-cluster.

apiVersion: redpanda.vectorized.io/v1alpha1
kind: Cluster
metadata:
  name: one-node-cluster
spec:
  image: vectorized/redpanda
  version: latest
  replicas: 1
  resources:
    requests:
      cpu: 1
      memory: 2Gi
    limits:
      cpu: 1
      memory: 2Gi
  configuration:
    rpcServer:
      port: 33145
    kafkaApi:
    - port: 9092
    pandaproxyApi:
    - port: 8082
    schemaRegistry:
      port: 8081
    adminApi:
    - port: 9644
    developerMode: true

Redpanda will run in the redpanda namespace. Let’s create that namespace first.

$ kubectl create ns redpanda

Then, let’s create the Cluster object in the redpanda namespace.

$ kubectl apply -f redpanda-cluster.yaml -n redpanda

The Redpanda operator creates two Kubernetes Services for the cluster. The first of them one-node-cluster is a headless service and it is used in internal communication. We could have enabled external communication, but it is not required in our scenario. Applications are using the port 9092, which is compatible with the Kafka API. There is also a dedicated service for exposing the schema registry under the port 8081.

$ kubectl get svc -n redpanda
NAME                       TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                    AGE
one-node-cluster           ClusterIP   None            <none>        9644/TCP,9092/TCP,8082/TCP   3m
one-node-cluster-cluster   ClusterIP   10.98.26.202    <none>        8081/TCP                     3m

Install via Helm without operator

We can also use the Redpanda Helm chart, which does not require the installation of CRDs or an operator, but instead creates a cluster according to the configuration in a values.yaml file. This is the recommended way to install Redpanda on Kubernetes. In order to use it, clone the Redpanda repository with the Helm chart:

$ git clone https://github.com/redpanda-data/helm-charts.git
$ cd helm-charts/redpanda

Then, you need to install Redpanda using the following Helm command. Since we use a single-node Kubernetes cluster we override the default number of brokers using the statefulset.replicas parameter.

$ helm install redpanda . \
    -n redpanda \
    --create-namespace \
    --set statefulset.replicas=1

In comparison to the previous installation method, you would have to install the kube-prometheus-stack separately.

$ helm repo add prometheus-community https://prometheus-community.github.io/helm-charts && \
  helm repo update

Enable Prometheus Metrics

In the previous section, we installed the Prometheus stack using the Redpanda operator. By default, it monitors Kubernetes core components and the Redpanda operator. Our goal is to enable monitoring of the currently created Redpanda cluster in the redpanda namespace. To do that, we need to create the PodMonitor object provided by the Prometheus operator. It requires us to at least set the pod target namespace, label selector, and metrics endpoints. The Redpanda operator exposes metrics on the admin port (9644) under metrics and public_metrics paths.

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  labels:
    app.kubernetes.io/instance: redpanda-cluster
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/name: redpanda-cluster
    app.kubernetes.io/version: v22.2.2
    helm.sh/chart: redpanda-operator-v22.2.2
    release: redpanda-operator
  name: redpanda-cluster-monitor
  namespace: redpanda-system
spec:
  namespaceSelector:
    matchNames:
      - redpanda
  podMetricsEndpoints:
    - path: /metrics
      port: admin
    - path: /public_metrics
      port: admin
  selector:
    matchLabels:
      app.kubernetes.io/name: redpanda

Once you create the PodMonitor, you will be able to query Redpanda metrics. There are a lot of exported metrics. You can verify these metrics in the Prometheus dashboard. Their names are starting with the vectorized_ prefix.

redpanda-kubernetes-prometheus

The simplest way to view important metrics is through the Grafana dashboard. Therefore, we are going to create a dashboard there. Fortunately, Redpanda provides an automatic mechanism for generating a Grafana dashboard. There is a dedicated Redpanda CLI (rpk) command to do it. We just need to set the name of the data source (Prometheus) and metrics endpoint URL (public_metrics).

$ kubectl exec pod/one-node-cluster-0 -n redpanda -c redpanda \
    -- rpk generate grafana-dashboard --datasource Prometheus \
    --metrics-endpoint http://localhost:9644/public_metrics \
    > redpanda-dashboard.json

Once we export the dashboard as a JSON file, we may import it into the Grafana dashboard.

Enable Redpanda Console

Redpanda provides a UI dashboard for managing cluster instances called Redpanda Console. However, it is not installed by the operator. In order to run it on Kubernetes, we will use the Helm chart. Firstly, let’s add the required Helm repository:

$ helm repo add redpanda-console https://packages.vectorized.io/public/console/helm/charts/ && \
  helm repo update

We need to override some configuration settings. By default, the Redpanda Console tries to detect both broker and schema registry on a localhost address. Since we are running Redpanda on Kubernetes we need to set the name of the service one-node-cluster for the broker and one-node-cluster-cluster for the schema registry. Here’s our values.yaml file:

console:
  config:
    kafka:
      brokers:
        - one-node-cluster:9092
      clientId: redpanda-console
    schemaRegistry:
      enabled: true
      urls: ["http://one-node-cluster-cluster:8081"]
      username: console
      password: redacted

Finally, let’s install the console in the same namespace as the broker.

$ helm install redpanda-console redpanda-console/console \
    --values values.yaml \
    -n redpanda

The Redpanda Console is available under the 8080 port. We can enable port-forward to access it locally.

Integrate Spring Boot with Redpanda

Our instance of Redpanda is ready. Let’s run sample applications on Kubernetes. They are sending events to Redpanda and receiving them from Redpanda. Our applications are written in Kotlin and built on top of Spring Boot and use Spring Cloud Stream to integrate with the Kafka-compatible API. They are using the Avro format for serializing and deserializing messages. Thanks to the Spring Cloud Schema Registry client, they may also integrate with the schema registry provided by Redpanda.

Here’s the list of required modules for both producer and consumer apps:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-schema-registry-client</artifactId>
</dependency>

If you would like to read more about Spring Cloud support for Kafka and schema registry you can refer to this article on my blog. It describes how to build event-driven architectures with Spring Cloud Stream Kafka and use Avro format in communication between apps.

Our producer app continuously generates and sends messages to the Redpanda topic. It is integrated with the schema registry available under the address provided in the spring.cloud.schemaRegistryClient.endpoint property. To enable that integration, we need to annotate the main class with the @EnableSchemaRegistryClient.

@SpringBootApplication
@EnableSchemaRegistryClient
class ProductionApplication {

   var id: Int = 0

   @Value("\${callme.supplier.enabled}")
   val supplierEnabled: Boolean = false

   @Bean
   fun callmeEventSupplier(): Supplier<Message<CallmeEvent>?> = Supplier { createEvent() }

   @Primary
   @Bean
   fun schemaRegistryClient(@Value("\${spring.cloud.schemaRegistryClient.endpoint}") endpoint: String?): SchemaRegistryClient {
      val client = ConfluentSchemaRegistryClient()
      client.setEndpoint(endpoint)
      return client
   }

   private fun createEvent(): Message<CallmeEvent>? {
      return if (supplierEnabled)
          MessageBuilder.withPayload(CallmeEvent(++id, "I'm callme event!", "ping"))
               .setHeader("to_process", true)
               .build()
      else
         null
   }
}

Our app does not contain a lot of code, however, we need to provide some configuration settings. In order to enable serialization with Avro, we need to set a default content type to application/*+avro (1). The target topic on Redpanda is callme-events (2). It consists of 2 partitions (3). We also need to set the name of bean responsible for generating messages (4). With the property spring.cloud.schema.avro.dynamicSchemaGenerationEnabled we may enable automatic generation of the Avro schema based on the source code (5). Of course, we also need provide the Redpanda broker address (6) and schema registry address (7).

spring.application.name=producer-service
spring.cloud.stream.default.contentType=application/*+avro # (1)
spring.cloud.stream.bindings.callmeEventSupplier-out-0.contentType=application/*+avro
spring.cloud.stream.bindings.callmeEventSupplier-out-0.destination=callme-events # (2)
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionCount=2 # (3)
spring.cloud.stream.source=callmeEventSupplier # (4)

spring.cloud.schema.avro.dynamicSchemaGenerationEnabled=true # (5)
spring.cloud.schemaRegistryClient.endpoint=http://one-node-cluster-cluster:8081/ # (6)
spring.kafka.bootstrap-servers=one-node-cluster:9092 # (7)
spring.main.allow-bean-definition-overriding=true

callme.supplier.enabled=true

Finally, let’s build and deploy our producer app on Kubernetes. We may use Skaffold for that. The app source code is configured to support it. Here’s our Deployment manifest:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
      - name: producer
        image: piomin/producer-service
        ports:
        - containerPort: 8080

Let’s verify a list of running pods in the redpanda namespace:

$ kubectl get pod -n redpanda
NAME                               READY   STATUS    RESTARTS   AGE
one-node-cluster-0                 1/1     Running   0          112m
producer-5b7f5cfcc6-586z2          1/1     Running   0          65m
redpanda-console-dcf446dc8-fzc2t   1/1     Running   0          104m

Monitor Redpanda on Kubernetes with Console and Prometheus

Our producer app is running on Kubernetes. It generates and sends messages. Let’s switch to the Redpanda Console. Here’s the view with a list of topics. As you see, the topic callme-events has been created:

redpanda-kubernetes-console

If you click on the topic, you will see the details and a list of messages:

Also, let’s verify the message schema available under the Schema Registry menu. You can compare it with the CallmeEvent object in the source code.

redpanda-kubernetes-schema

Then, let’s run our consumer app. It also integrates with the schema registry and receives messages form callme-events topic.

Thanks to Prometheus and Grafana, we can monitor several parameters related to the Redpanda broker. Here’s the screen from the Grafana dashboard:

Final Thoughts

Redpanda simplifies deployment on Kubernetes in comparison to the standard Kafka. Within the single pod, we have a broker, a schema registry, and an HTTP proxy. We can also easily install a UI console to manage Redpanda graphically. We can easily customize the Redpanda cluster using the CRD object provided by the operator.

The post Running Redpanda on Kubernetes appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/09/06/running-redpanda-on-kubernetes/feed/ 2 13109
Local Development with Redpanda, Quarkus and Testcontainers https://piotrminkowski.com/2022/04/20/local-development-with-redpanda-quarkus-and-testcontainers/ https://piotrminkowski.com/2022/04/20/local-development-with-redpanda-quarkus-and-testcontainers/#comments Wed, 20 Apr 2022 08:13:36 +0000 https://piotrminkowski.com/?p=11098 In this article, you will learn how to speed up your local development with Redpanda and Quarkus. The main goal is to show that you can replace Apache KafkaⓇ with Redpanda without any changes in the source code. Instead, you will get a fast way to run your existing Kafka applications without Zookeeper and JVM. […]

The post Local Development with Redpanda, Quarkus and Testcontainers appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to speed up your local development with Redpanda and Quarkus. The main goal is to show that you can replace Apache Kafka with Redpanda without any changes in the source code. Instead, you will get a fast way to run your existing Kafka applications without Zookeeper and JVM. You will also see how Quarkus uses Redpanda as a local instance for development. Finally, we are going to run all containers in the Testcontainers Cloud.

For the current exercise, we use the same examples as described in one of my previous articles about Quarkus and Kafka Streams. Just to remind you: we are building a simplified version of the stock market platform. The stock-service application receives and handles incoming orders. There are two types of orders: purchase (BUY) and sale (SELL). While the stock-service consumes Kafka streams, the order-service generates and sends events to the orders.buy and orders.sell topics. Here’s the diagram with our architecture. As you see, the stock-service also uses PostgreSQL as a database.

quarkus-redpanda-arch

Source Code

If you would like to try this exercise yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. Then switch to the dev branch. After that, you should just follow my instructions. Let’s begin.

Install Redpanda

This step is not required. However, it is worth installing Redpanda since it provides a useful CLI called Redpanda Keeper (rpk) to manage a cluster. To install Redpanda on macOS just run the following command:

$ brew install redpanda-data/tap/redpanda

Now, we can easily create and run a new cluster. For the development purpose, we only need a single-node Redpanda cluster. In order to run, you need to have Docker on your laptop.

$ rpk container start

Before proceeding to the next steps let’s just remove a current cluster. Quarkus will create everything for us automatically.

$ rpk container purge

Quarkus with Kafka and Postgres

Let’s begin with the stock-service. It consumes streams from Kafka topics and connects to the PostgreSQL database, as I mentioned before. So, the first step is to include the following dependencies:

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-kafka-streams</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-jdbc-postgresql</artifactId>
</dependency>

Now, we may proceed to the implementation. The topology for all the streams is provided inside the following method:

@Produces
public Topology buildTopology() {
   ...
}

There are some different streams defined there. But let’s just take a look at the fragment of topology responsible for creating transactions from incoming orders

final String ORDERS_BUY_TOPIC = "orders.buy";
final String ORDERS_SELL_TOPIC = "orders.sell";
final String TRANSACTIONS_TOPIC = "transactions";

// ... other streams

KStream<Long, Order> orders = builder.stream(
   ORDERS_SELL_TOPIC,
   Consumed.with(Serdes.Long(), orderSerde));

builder.stream(ORDERS_BUY_TOPIC, Consumed.with(Serdes.Long(), orderSerde))
   .merge(orders)
   .peek((k, v) -> {
      log.infof("New: %s", v);
      logic.add(v);
   });

builder.stream(ORDERS_BUY_TOPIC, Consumed.with(Serdes.Long(), orderSerde))
   .selectKey((k, v) -> v.getProductId())
   .join(orders.selectKey((k, v) -> v.getProductId()),
      this::execute,
      JoinWindows.of(Duration.ofSeconds(10)),
      StreamJoined.with(Serdes.Integer(), orderSerde, orderSerde))
   .filterNot((k, v) -> v == null)
   .map((k, v) -> new KeyValue<>(v.getId(), v))
   .peek((k, v) -> log.infof("Done -> %s", v))
   .to(TRANSACTIONS_TOPIC, Produced.with(Serdes.Long(), transactionSerde));

The whole implementation is more advanced. For the details, you may refer to the article I mentioned in the introduction. Now, let’s imagine we are still developing our stock market app. Firstly, we should run PostgreSQL and a local Kafka cluster. We use Redpanda, which is easy to run locally. After that, we would typically provide addresses of both the database and broker in the application.properties. But using a feature called Quarkus Dev Services, the only thing we need to configure now, are the names of topics used for consuming Kafka Streams and the application id. Both of these are required by Kafka Streams.

Now, the most important thing: you just need to start the Quarkus app. Nothing more. DO NOT run any external tools by yourself and DO NOT provide any addresses for them in the configuration settings. Just add the two lines you see below:

quarkus.kafka-streams.application-id = stock
quarkus.kafka-streams.topics = orders.buy,orders.sell

Run Quarkus in dev mode with Redpanda

Before you run the Quarkus app, make sure you have Docker running on your laptop. When you do, the only thing you need is to start both test apps. Let’s begin with the stock-service since it receives orders generated by the order-service. Go to the stock-service directory and run the following command:

$ cd stock-service
$ mvn quarkus:dev

If you see the following logs, it means that everything went well. Our application has been started in 13 seconds. During this time, Quarkus also started Kafka, PostgreSQL on Docker, and built Kafka Streams. Everything in 13 seconds with a single command and without any additional configuration. Nice, right? Let’s check out what happened in the background:

Firstly, let’s find the following line of logs beginning with the sentence “Dev Services for Kafka started”. It perfectly describes the feature of Quarkus called Dev Services. Our Kafka instance has been started as a Docker container and is available under a dynamically generated port. The application connects to it. All other Quarkus apps you would run now will share the same instance of a broker. You can disable that feature by setting the property quarkus.kafka.devservices.shared to false.

It may be a little surprising, but Quarkus Dev Services for Kafka uses Redpanda to run a broker. Of course, Redpanda is a Kafka-compatible solution. Since it starts in ~one second and does not require Zookeeper, it is a great choice for local development.

In order to run tools like brokers or databases on Docker, Quarkus uses Testcontainers. If you are interested in more details about Quarkus Dev Services for Kafka, read the following documentation. For now, let’s display a list of running containers using the docker ps command. There is a container with Redpanda, PostgreSQL, and Testcontainers.

quarkus-redpanda-containers

Manage Kafka Streams with Redpanda and Quarkus

Let’s verify how everything works on the application side. After running the application, we can take advantage of another useful Quarkus feature called Dev UI. Our UI console is available under the address http://localhost:8080/q/dev/. After accessing it, you can display a topology of Kafka Streams by clicking the button inside the Apache Kafka Streams tile.

Here you will see a summary of available streams. For me, it is 12 topics and 15 state stores. You may also see a visualization of Kafka Streams’ topology. The following picture shows the fragment of topology. You can download the full image by clicking the green download button, visible on the right side of the screen.

quarkus-redpanda-dev

Now, let’s use the Redpanda CLI to display a list of created topics. In my case, Redpanda is available under the port 55001 locally. All the topics are automatically created by Quarkus during application startup. We need to define the names of topics used in communication between both our test apps. Those topics are: orders.buy, orders.sell and transactions. They are configured and created by the order-service. The stock-service is creating all other topics visible below, which are responsible for handling streams.

$ rpk topic list --brokers localhost:55001
NAME                                                    PARTITIONS  REPLICAS
orders.buy                                              1           1
orders.sell                                             1           1
stock-KSTREAM-JOINOTHER-0000000016-store-changelog      1           1
stock-KSTREAM-JOINOTHER-0000000043-store-changelog      1           1
stock-KSTREAM-JOINOTHER-0000000065-store-changelog      1           1
stock-KSTREAM-JOINTHIS-0000000015-store-changelog       1           1
stock-KSTREAM-JOINTHIS-0000000042-store-changelog       1           1
stock-KSTREAM-JOINTHIS-0000000064-store-changelog       1           1
stock-KSTREAM-KEY-SELECT-0000000005-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000006-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000032-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000033-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000054-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000055-repartition         1           1
stock-transactions-all-summary-changelog                1           1
stock-transactions-all-summary-repartition              1           1
stock-transactions-per-product-summary-30s-changelog    1           1
stock-transactions-per-product-summary-30s-repartition  1           1
stock-transactions-per-product-summary-changelog        1           1
stock-transactions-per-product-summary-repartition      1           1
transactions                                            1           1

In order to do a full test, we also need to run order-service. It is generating orders continuously and sending them to the orders.buy or orders.sell topics. Let’s do that.

Send messages to Redpanda with Quarkus

Before we run order-service, let’s see some implementation details. On the producer side, we need to include a single dependency responsible for integration with a Kafka broker:

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

Our application generates and sends random orders to the orders.buy or orders.sell topics. There are two methods for that, each of them dedicated to a single topic. Let’s just see a method for generating BUY orders. We need to annotate it with @Outgoing and set the channel name (orders-buy). Our method generates a single order per 500 milliseconds.

@Outgoing("orders-buy")
public Multi<Record<Long, Order>> buyOrdersGenerator() {
   return Multi.createFrom().ticks().every(Duration.ofMillis(500))
      .map(order -> {
         Integer productId = random.nextInt(10) + 1;
         int price = prices.get(productId) + random.nextInt(200);
         Order o = new Order(
            incrementOrderId(),
            random.nextInt(1000) + 1,
            productId,
            100 * (random.nextInt(5) + 1),
            LocalDateTime.now(),
            OrderType.BUY,
            price);
         log.infof("Sent: %s", o);
      return Record.of(o.getId(), o);
   });
}

After that, we need to map the channel name into a target topic name. Another required operation is to set the serializer for the message key and value.

mp.messaging.outgoing.orders-buy.connector = smallrye-kafka
mp.messaging.outgoing.orders-buy.topic = orders.buy
mp.messaging.outgoing.orders-buy.key.serializer = org.apache.kafka.common.serialization.LongSerializer
mp.messaging.outgoing.orders-buy.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Finally, go to the order-service directory and run the application.

$ cd order-service
$ mvn quarkus:dev

Once you start order-service, it will create topics and start sending orders. It uses the same instance of Redpanda as stock-service. You can run the docker ps command once again to verify it.

Now, just do a simple change in stock-service to reload the application. It will also reload the Kafka Streams topology. After that, it is starting to receive orders from the topics created by the order-service. Finally, it will create transactions from incoming orders and store them in the transactions topic.

Use Testcontainers Cloud

In our development process, we need to have a locally installed Docker ecosystem. But, what if we don’t have it? That’s where Testcontainers Cloud comes in. Testcontainers Cloud is the developer-first SaaS platform for modern integration testing with real databases, message brokers, cloud services, or any other component of application infrastructure. To simplify, we will do the same thing as before but our instances of Redpanda and PostgreSQL will not run on the local Docker, but on the remote Testcointainers platform.

What do you need to do to enable Testcontainers Cloud? Firstly, download the agent from the following site. You also need to be a beta tester to obtain an authorization token. Finally, just run the agent and kill your local Docker daemon. You should see the Testcontainers icon in the running apps with information about the connection to the cloud.

quarkus-redpanda-testcontainers

Docker should not run locally.

The same as before, just run both applications with the quarkus:dev command. Your Redpanda broker is running on the Testcontainers Cloud but, thanks to the agent, you may access it over localhost.

Once again you can verify a list of topics using the following command for the new broker:

$ rpk topic list --brokers localhost:59779

Final Thoughts

In this article, I focused on showing you how new and exciting technologies like Quarkus, Redpanda, and Testcontainers can work together. Local development is one of the use cases, but you may as well use them to write integration tests.

The post Local Development with Redpanda, Quarkus and Testcontainers appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/04/20/local-development-with-redpanda-quarkus-and-testcontainers/feed/ 2 11098
Distributed Transactions in Microservices with Kafka Streams and Spring Boot https://piotrminkowski.com/2022/01/24/distributed-transactions-in-microservices-with-kafka-streams-and-spring-boot/ https://piotrminkowski.com/2022/01/24/distributed-transactions-in-microservices-with-kafka-streams-and-spring-boot/#comments Mon, 24 Jan 2022 11:11:24 +0000 https://piotrminkowski.com/?p=10501 In this article, you will learn how to use Kafka Streams with Spring Boot. We will rely on the Spring Kafka project. In order to explain well how it works, we are going to implement a saga pattern. The saga pattern is a way to manage distributed transactions across microservices. The key phase of that […]

The post Distributed Transactions in Microservices with Kafka Streams and Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams with Spring Boot. We will rely on the Spring Kafka project. In order to explain well how it works, we are going to implement a saga pattern. The saga pattern is a way to manage distributed transactions across microservices. The key phase of that process is to publish an event that triggers local transactions. Microservices exchanges such events through a message broker. It turns out that Kafka Streams may help us here. Let’s see how!

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions.

Instead of Spring Kafka, you could as well use Spring Cloud Stream for Kafka. You can read more about it in this article. Spring Cloud Stream provides several useful features like DLQ support, serialization to JSON by default, or interactive queries.

Architecture

We will create a simple system that consists of three microservices. The order-service sends orders to the Kafka topic called orders. Both other microservices stock-service and payment-service listen for the incoming events. After receiving them they verify if it is possible to execute the order. For example, if there are no sufficient funds on the customer account the order is rejected. Otherwise, the payment-service accepts the order and sends a response to the payment-orders topic. The same with stock-service except that it verifies a number of products in stock and sends a response to the stock-orders topic.

Then, the order-service joins two streams from the stock-orders and payment-orders topics by the order’s id. If both orders were accepted it confirms a distributed transaction. On the other hand, if one order has been accepted and the second rejected it performs rollback. In that case, it just generates o new order event and sends it to the orders topic. We may treat the orders topic as a stream of the order’s status changes or just like a table with the last status. Here’s the picture that visualizes our scenario.

kafka-streams-spring-boot-arch

Kafka Streams with Spring Boot

Let’s begin our implementation from the order-service. Surprisingly there is no Spring Boot starter for Kafka (unless we use Spring Cloud Stream). Therefore we need to include the spring-kafka dependency. In order to process streams we also need to include the kafka-streams module directly. Since the order-service exposes some REST endpoints it is required to add the Spring Boot Web starter.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
</dependency>

The order-service is the most important microservice in our scenario. It acts as an order gateway and a saga pattern orchestrator. It requires all the three topics used in our architecture. In order to automatically create topics on application startup we need to define the following beans:

@Bean
public NewTopic orders() {
   return TopicBuilder.name("orders")
         .partitions(3)
         .compact()
         .build();
}

@Bean
public NewTopic paymentTopic() {
   return TopicBuilder.name("payment-orders")
         .partitions(3)
         .compact()
         .build();
}

@Bean
public NewTopic stockTopic() {
   return TopicBuilder.name("stock-orders")
         .partitions(3)
         .compact()
         .build();
}

Then, let’s define our first Kafka stream. To do that we need to use the StreamsBuilder bean. The order-service receives events from the payment-service (in the payment-events topic) and from the stock-service (in the stock-events topic). Every single event contains the id previously set by the order-service. If we join both these streams into a single stream by order’s id we will be able to determine the status of our transaction. The algorithm is pretty simple. If both payment-service and stock-service accepted the order the final status of transaction is CONFIRMED. If both services rejected the order the final status is REJECTED. The last option is ROLLBACK – when one service accepted the order, and one service rejected it. Here’s the described method inside the OrderManageService bean.

@Service
public class OrderManageService {

   public Order confirm(Order orderPayment, Order orderStock) {
      Order o = new Order(orderPayment.getId(),
             orderPayment.getCustomerId(),
             orderPayment.getProductId(),
             orderPayment.getProductCount(),
             orderPayment.getPrice());
      if (orderPayment.getStatus().equals("ACCEPT") &&
                orderStock.getStatus().equals("ACCEPT")) {
         o.setStatus("CONFIRMED");
      } else if (orderPayment.getStatus().equals("REJECT") &&
                orderStock.getStatus().equals("REJECT")) {
         o.setStatus("REJECTED");
      } else if (orderPayment.getStatus().equals("REJECT") ||
                orderStock.getStatus().equals("REJECT")) {
         String source = orderPayment.getStatus().equals("REJECT")
                    ? "PAYMENT" : "STOCK";
         o.setStatus("ROLLBACK");
         o.setSource(source);
      }
      return o;
   }

}

Finally, the implementation of our stream. We need to define the KStream bean. We are joining two streams using the join method of KStream. The joining window is 10 seconds. As the result, we are setting the status of the order and sending a new order to the orders topic. We use the same topic as for sending new orders.

@Autowired
OrderManageService orderManageService;

@Bean
public KStream<Long, Order> stream(StreamsBuilder builder) {
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   KStream<Long, Order> stream = builder
         .stream("payment-orders", Consumed.with(Serdes.Long(), orderSerde));

   stream.join(
         builder.stream("stock-orders"),
         orderManageService::confirm,
         JoinWindows.of(Duration.ofSeconds(10)),
         StreamJoined.with(Serdes.Long(), orderSerde, orderSerde))
      .peek((k, o) -> LOG.info("Output: {}", o))
      .to("orders");

   return stream;
}

Let’s see it also in the picture.

kafka-streams-spring-boot-join

Configuration for Spring Boot

In Spring Boot the name of the application is by default the name of the consumer group for Kafka Streams. Therefore, we should set in application.yml. Of course, we also need to set the address of the Kafka bootstrap server. Finally, we have to configure the default key and value for events serialization. It applies to both standard and streams processing.

spring.application.name: orders
spring.kafka:
  bootstrap-servers: 127.0.0.1:56820
  producer:
    key-serializer: org.apache.kafka.common.serialization.LongSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  streams:
    properties:
      default.key.serde: org.apache.kafka.common.serialization.Serdes$LongSerde
      default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
      spring.json.trusted.packages: "*"

Sending and receiving events from Kafka topics

In the previous section, we discussed how to create a new Kafka stream as a result of joining two other streams. Now, let’s see how to process incoming messages. We can consider it on the example of the payment-service. It listens for the incoming orders. If it receives a new order it performs reservation on the customer’s account and sends a response with a reservation status to the payment-orders topic. If it receives confirmation of the transaction from the order-service, it commits the transaction or rollbacks it. In order to enable Kafka listener, we should annotate the main class with @EnableKafka. Additionally, the listening method must be annotated with the @KafkaListener. The following method listens for events on the orders topic and runs in the payment consumer group.

@SpringBootApplication
@EnableKafka
public class PaymentApp {

    private static final Logger LOG = LoggerFactory.getLogger(PaymentApp.class);

    public static void main(String[] args) {
        SpringApplication.run(PaymentApp.class, args);
    }

    @Autowired
    OrderManageService orderManageService;

    @KafkaListener(id = "orders", topics = "orders", groupId = "payment")
    public void onEvent(Order o) {
        LOG.info("Received: {}" , o);
        if (o.getStatus().equals("NEW"))
            orderManageService.reserve(o);
        else
            orderManageService.confirm(o);
    }
}

Here’s the implementation of the OrderManageService used in the previous code snippet. If it receives the order in the NEW status it performs reservation. During the reservation, it subtracts the order’s price from the amountAvailable field and adds the same value to the amountReserved field. Then it sets the status of the order and sends a response to the payment-orders topic using KafkaTemplate. During the confirmation phase, it doesn’t send any response event. It can perform a rollback, which means – subtracting the order’s price from the amountReserved field and adding it to the amountAvailable field. Otherwise it just “commits” the transaction by subtracting the price from the amountReserved field.

@Service
public class OrderManageService {

   private static final String SOURCE = "payment";
   private static final Logger LOG = LoggerFactory.getLogger(OrderManageService.class);
   private CustomerRepository repository;
   private KafkaTemplate<Long, Order> template;

   public OrderManageService(CustomerRepository repository, KafkaTemplate<Long, Order> template) {
      this.repository = repository;
      this.template = template;
   }

   public void reserve(Order order) {
      Customer customer = repository.findById(order.getCustomerId()).orElseThrow();
      LOG.info("Found: {}", customer);
      if (order.getPrice() < customer.getAmountAvailable()) {
         order.setStatus("ACCEPT");
         customer.setAmountReserved(customer.getAmountReserved() + order.getPrice());
         customer.setAmountAvailable(customer.getAmountAvailable() - order.getPrice());
      } else {
         order.setStatus("REJECT");
      }
      order.setSource(SOURCE);
      repository.save(customer);
      template.send("payment-orders", order.getId(), order);
      LOG.info("Sent: {}", order);
   }

   public void confirm(Order order) {
      Customer customer = repository.findById(order.getCustomerId()).orElseThrow();
      LOG.info("Found: {}", customer);
      if (order.getStatus().equals("CONFIRMED")) {
         customer.setAmountReserved(customer.getAmountReserved() - order.getPrice());
         repository.save(customer);
      } else if (order.getStatus().equals("ROLLBACK") && !order.getSource().equals(SOURCE)) {
         customer.setAmountReserved(customer.getAmountReserved() - order.getPrice());
         customer.setAmountAvailable(customer.getAmountAvailable() + order.getPrice());
         repository.save(customer);
      }

   }
}

A similar logic is implemented on the stock-service side. However, instead of the order’s price, it uses the field productCount and performs reservation for the desired number of ordered products. Here’s the implementation of the OrderManageService class in the stock-service.

@Service
public class OrderManageService {

   private static final String SOURCE = "stock";
   private static final Logger LOG = LoggerFactory.getLogger(OrderManageService.class);
   private ProductRepository repository;
   private KafkaTemplate<Long, Order> template;

   public OrderManageService(ProductRepository repository, KafkaTemplate<Long, Order> template) {
      this.repository = repository;
      this.template = template;
   }

   public void reserve(Order order) {
      Product product = repository.findById(order.getProductId()).orElseThrow();
      LOG.info("Found: {}", product);
      if (order.getStatus().equals("NEW")) {
         if (order.getProductCount() < product.getAvailableItems()) {
            product.setReservedItems(product.getReservedItems() + order.getProductCount());
            product.setAvailableItems(product.getAvailableItems() - order.getProductCount());
            order.setStatus("ACCEPT");
            repository.save(product);
         } else {
            order.setStatus("REJECT");
         }
         template.send("stock-orders", order.getId(), order);
         LOG.info("Sent: {}", order);
      }
   }

   public void confirm(Order order) {
      Product product = repository.findById(order.getProductId()).orElseThrow();
      LOG.info("Found: {}", product);
      if (order.getStatus().equals("CONFIRMED")) {
         product.setReservedItems(product.getReservedItems() - order.getProductCount());
         repository.save(product);
      } else if (order.getStatus().equals("ROLLBACK") && !order.getSource().equals(SOURCE)) {
         product.setReservedItems(product.getReservedItems() - order.getProductCount());
         product.setAvailableItems(product.getAvailableItems() + order.getProductCount());
         repository.save(product);
      }
   }

}

Query Kafka Stream with Spring Boot

Now, let’s consider the following scenario. Firstly, the order-service receives a new order (via REST API) and sends it to the Kafka topic. This order is then received by both other microservices. Once they send back a positive response (or negative) the order-service process them as streams and change the status of the order. The order with a new status is emitted to the same topic as before. So, where we are storing the data with the current status of an order? In Kafka topic. Once again we will use Kafka Streams in our Spring Boot application. But this time, we take advantage of KTable. Let’s at the visualization of our scenario.

kafka-streams-spring-boot-ktable

Ok, so let’s define another Kafka Streams bean in the order-service. We are getting the same orders topic as a stream. We will convert it to the Kafka table and materialize it in a persistent store. Thanks to that we will be able to easily query the store from our REST controller.

@Bean
public KTable<Long, Order> table(StreamsBuilder builder) {
   KeyValueBytesStoreSupplier store =
         Stores.persistentKeyValueStore("orders");
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   KStream<Long, Order> stream = builder
         .stream("orders", Consumed.with(Serdes.Long(), orderSerde));
   return stream.toTable(Materialized.<Long, Order>as(store)
         .withKeySerde(Serdes.Long())
         .withValueSerde(orderSerde));
}

If we run more than one instance of the order-service on the same machine it is also important to override the default location of the state store. To do that we should define the following property unique per every instance:

spring.kafka.streams.state-dir: /tmp/kafka-streams/1

Unfortunately, there is no built-in support in Spring Boot for interactive queries of Kafka Streams. However, we can use auto-configured StreamsBuilderFactoryBean to inject KafkaStreams instance into the controller. Then we can query the state store under the “materialized” name. That’s of course very trivial sample. We are just getting all orders from KTable.

@GetMapping
public List<Order> all() {
   List<Order> orders = new ArrayList<>();
      ReadOnlyKeyValueStore<Long, Order> store = kafkaStreamsFactory
         .getKafkaStreams()
         .store(StoreQueryParameters.fromNameAndType(
                "orders",
                QueryableStoreTypes.keyValueStore()));
   KeyValueIterator<Long, Order> it = store.all();
   it.forEachRemaining(kv -> orders.add(kv.value));
   return orders;
}

In the same OrderController there is also a method for sending a new order to the Kafka topic.

@PostMapping
public Order create(@RequestBody Order order) {
   order.setId(id.incrementAndGet());
   template.send("orders", order.getId(), order);
   LOG.info("Sent: {}", order);
   return order;
}

Testing Scenario

Before we run our sample microservices, we need to start the local instance of Kafka. Usually, I’m using Redpanda for that. It is a Kafka API compatible streaming platform. In comparison to Kafka, it is relatively easy to run it locally. All you need to do is to install the rpk CLI locally (here is the instruction for macOS). After that, you can create a single-node instance using the following command:

$ rpk container start

After running, it will print the address of your node. For me, it is 127.0.0.1:56820. You should put that address as a value of the spring.kafka.bootstrap-servers property. You can also display a list of created topics using the following command:

$ rpk topic list --brokers 127.0.0.1:56820

Then, let’s run our microservices. Begin from the order-service since it is creating all the required topics and building the Kafka Streams instance. You can send a single order using the REST endpoint:

$ curl -X 'POST' \
  'http://localhost:8080/orders' \
  -H 'Content-Type: application/json' \
  -d '{
  "customerId": 10,
  "productId": 10,
  "productCount": 5,
  "price": 100,
  "status": "NEW"
}'

There is some test data inserted while payment-service and stock-service start. So, you can set the value of customerId or productId between 1 and 100 and it will work for you. However, you can use as well a method for generating a random stream of data. The following bean is responsible for generating 10000 random orders:

@Service
public class OrderGeneratorService {

   private static Random RAND = new Random();
   private AtomicLong id = new AtomicLong();
   private Executor executor;
   private KafkaTemplate<Long, Order> template;

   public OrderGeneratorService(Executor executor, KafkaTemplate<Long, Order> template) {
      this.executor = executor;
      this.template = template;
   }

   @Async
   public void generate() {
      for (int i = 0; i < 10000; i++) {
         int x = RAND.nextInt(5) + 1;
         Order o = new Order(id.incrementAndGet(), RAND.nextLong(100) + 1, RAND.nextLong(100) + 1, "NEW");
         o.setPrice(100 * x);
         o.setProductCount(x);
         template.send("orders", o.getId(), o);
      }
   }
}

You can start that process by calling the endpoint POST /orders/generate.

@PostMapping("/generate")
public boolean create() {
   orderGeneratorService.generate();
   return true;
}

No matter if decide to send single order or generate multiple random orders you can easily query the status of orders using the following endpoint:

$ curl http://localhost:8080/orders

Here’s a structure of topics generated by the application and by Kafka Streams to perform join operation and save the orders KTable as a state store.

The post Distributed Transactions in Microservices with Kafka Streams and Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/01/24/distributed-transactions-in-microservices-with-kafka-streams-and-spring-boot/feed/ 26 10501
Kafka Streams with Quarkus https://piotrminkowski.com/2021/11/24/kafka-streams-with-quarkus/ https://piotrminkowski.com/2021/11/24/kafka-streams-with-quarkus/#comments Wed, 24 Nov 2021 08:24:53 +0000 https://piotrminkowski.com/?p=10234 In this article, you will learn how to use Kafka Streams with Quarkus. The same as in my previous article we will create a simple application that simulates the stock market. But this time, we are going to use Quarkus instead of Spring Cloud. If you would like to figure out what is a streaming […]

The post Kafka Streams with Quarkus appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams with Quarkus. The same as in my previous article we will create a simple application that simulates the stock market. But this time, we are going to use Quarkus instead of Spring Cloud. If you would like to figure out what is a streaming platform and how it differs from a traditional message broker this article is for you. Moreover, we will study useful improvements related to Apache Kafka provided by Quarkus.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Architecture

In our case, there are two incoming streams of events. Both of them represent incoming orders. These orders are generated by the order-service application. It sends buy orders to the orders.buy topic and sell orders to the orders.sell topic. Then, the stock-service application receives and handles incoming events. In the first step, it needs to change the key of each message from the orderId to the productId. That’s because it has to join orders from different topics related to the same product in order to execute transactions. Finally, the transaction price is an average of sale and buy prices.

quarkus-kafka-streams-arch

We are building a simplified version of the stock market platform. Each buy order contains a maximum price at which a customer is expecting to buy a product. On the other hand, each sale order contains a minimum price a customer is ready to sell his product. If the sell order price is not greater than a buy order price for a particular product we are performing a transaction.

Each order is valid for 10 seconds. After that time the stock-service application will not handle such an order since it is considered as expired. Each order contains a number of products for a transaction. For example, we may sell 100 for 10 or buy 200 for 11. Therefore, an order may be fully or partially realized. The stock-service application tries to join partially realized orders to other new or partially realized orders. You can see the visualization of that process in the picture below.

quarkus-kafka-streams-app

Run Apache Kafka locally

Before we jump to the implementation, we need to run a local instance of Apache Kafka. If you don’t want to install it on your laptop, the best way to run it is with Redpanda. Redpanda is a Kafka API compatible streaming platform. In comparison to Kafka, it is relatively easy to run it locally. Normally, you would have to install Redpanda on your laptop and then create a cluster using their CLI. But with Quarkus you don’t need to do that! The only requirement is to have Docker installed. Thanks to the Quarkus Kafka extension and feature called Dev Services it automatically starts a Kafka broker in dev mode and when running tests. Moreover, the application is configured automatically.

The only thing you need to do in order to enable that feature is NOT to provide any Kafka address in configuration properties. Dev Services uses Testcontainers to run Kafka, so if you have Docker or any other environment supporting Testcontainers running you get a containerized instance of Kafka out-of-the-box. Another important thing. Firstly, start the order-service application. It automatically creates all the required topics in Kafka. Then run the stock-service application. It uses the Quarkus Kafka Streams extension and verifies if the required topics exist. Let’s visualize it.

quarkus-kafka-streams-run

Send events to Kafka with Quarkus

There are several ways to send events to Kafka with Quarkus. Because we need to send key/value pair we will use the io.smallrye.reactive.messaging.kafka.Record object for that. Quarkus is able to generate and send data continuously. In the fragment of code visible below, we send a single Order event per 500 ms. Each Order contains a random productId, price and productCount.

@Outgoing("orders-buy")
public Multi<Record<Long, Order>> buyOrdersGenerator() {
   return Multi.createFrom().ticks().every(Duration.ofMillis(500))
      .map(order -> {
         Integer productId = random.nextInt(10) + 1;
         int price = prices.get(productId) + random.nextInt(200);
         Order o = new Order(
             incrementOrderId(),
             random.nextInt(1000) + 1,
             productId,
             100 * (random.nextInt(5) + 1),
             LocalDateTime.now(),
             OrderType.BUY,
             price);
         log.infof("Sent: %s", o);
         return Record.of(o.getId(), o);
   });
}

@Outgoing("orders-sell")
public Multi<Record<Long, Order>> sellOrdersGenerator() {
   return Multi.createFrom().ticks().every(Duration.ofMillis(500))
      .map(order -> {
         Integer productId = random.nextInt(10) + 1;
         int price = prices.get(productId) + random.nextInt(200);
         Order o = new Order(
             incrementOrderId(),
             random.nextInt(1000) + 1,
             productId,
             100 * (random.nextInt(5) + 1),
             LocalDateTime.now(),
             OrderType.SELL,
             price);
         log.infof("Sent: %s", o);
         return Record.of(o.getId(), o);
   });
}

We will also define a single @Incoming channel in order to receive transactions produced by the stock-service. Thanks to that Quarkus will automatically create the topic transactions used by Quarkus Kafka Streams in stock-service. To be honest, I was not able to force the Quarkus Kafka Streams extension to create the topic automatically. It seems we need to use the SmallRye Reactive Messaging extension for that.

@Incoming("transactions")
public void transactions(Transaction transaction) {
   log.infof("New: %s", transaction);
}

Of course, we need to include the SmallRye Reactive Messaging dependency to the Maven pom.xml.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

Finally, let’s provide configuration settings. We have two outgoing topics and a single incoming topic. We can set their names. Otherwise, Quarkus uses the same name as the name of the channel. The names of our topics are orders.buy, order.sell and transactions.

mp.messaging.outgoing.orders-buy.connector = smallrye-kafka
mp.messaging.outgoing.orders-buy.topic = orders.buy
mp.messaging.outgoing.orders-buy.key.serializer = org.apache.kafka.common.serialization.LongSerializer
mp.messaging.outgoing.orders-buy.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

mp.messaging.outgoing.orders-sell.connector = smallrye-kafka
mp.messaging.outgoing.orders-sell.topic = orders.sell
mp.messaging.outgoing.orders-sell.key.serializer = org.apache.kafka.common.serialization.LongSerializer
mp.messaging.outgoing.orders-sell.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

mp.messaging.incoming.transactions.connector = smallrye-kafka
mp.messaging.incoming.transactions.topic = transactions
mp.messaging.incoming.transactions.value.deserializer = pl.piomin.samples.streams.order.model.deserializer.TransactionDeserializer

That’s all. Our orders generator is ready. If you the order-service application Quarkus will also run Kafka (Redpanda) instance. But first, let’s switch to the second sample application – stock-service.

Consume Kafka Streams with Quarkus

In the previous section, we were sending messages to the Kafka broker. Therefore, we used a standard Quarkus library for integration with Kafka based on the SmallRye Reactive Messaging framework. The stock-service application consumes messages as streams, so now we will use a module for Kafka Streams integration.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-kafka-streams</artifactId>
</dependency>

Our application also uses a database, an ORM layer and includes some other useful modules.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-jdbc-h2</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-health</artifactId>
</dependency>

In the first step, we are going to merge both streams of orders (buy and sell), insert the Order into the database, and print the event message. You could ask – why I use the database and ORM layer here since I have Kafka KTable? Well, I need transactions with lock support in order to coordinate the status of order realization (refer to the description in the introduction – fully and partially realized orders). I will give you more details about it in the next sections.

In order to process streams with Quarkus, we need to declare the org.apache.kafka.streams.Topology bean. It contains all the KStream and KTable definitions. Let’s start just with the part responsible for creating and emitting transactions from incoming orders. There are two KStream definitions created. The first of them is responsible for merging two order streams into a single one and then inserting a new Order into a database. The second of them creates and executes transactions by joining two streams using the productId key. But more about it in the next section.

@Produces
public Topology buildTopology() {
   ObjectMapperSerde<Order> orderSerde = 
      new ObjectMapperSerde<>(Order.class);
   ObjectMapperSerde<Transaction> transactionSerde = 
      new ObjectMapperSerde<>(Transaction.class);

   StreamsBuilder builder = new StreamsBuilder();

   KStream<Long, Order> orders = builder.stream(
      ORDERS_SELL_TOPIC,
      Consumed.with(Serdes.Long(), orderSerde));

   builder.stream(ORDERS_BUY_TOPIC, 
         Consumed.with(Serdes.Long(), orderSerde))
      .merge(orders)
      .peek((k, v) -> {
         log.infof("New: %s", v);
         logic.add(v);
      });

   builder.stream(ORDERS_BUY_TOPIC, 
         Consumed.with(Serdes.Long(), orderSerde))
      .selectKey((k, v) -> v.getProductId())
      .join(orders.selectKey((k, v) -> v.getProductId()),
         this::execute,
         JoinWindows.of(Duration.ofSeconds(10)),
         StreamJoined.with(Serdes.Integer(), orderSerde, orderSerde))
      .filterNot((k, v) -> v == null)
      .map((k, v) -> new KeyValue<>(v.getId(), v))
      .peek((k, v) -> log.infof("Done -> %s", v))
      .to(TRANSACTIONS_TOPIC, Produced.with(Serdes.Long(), transactionSerde));

}

To process the streams we need to add configuration properties. A list of input topics is required. We can also override a default application id and enable Kafka health check.

quarkus.kafka-streams.application-id = stock
quarkus.kafka-streams.topics = orders.buy,orders.sell
quarkus.kafka.health.enabled = true

Operations on Kafka Streams

Now, we may use some more advanced operations on Kafka Streams than just merging two different streams. In fact, that’s a key logic in our application. We need to join two different order streams into a single one using the productId as a joining key. Since the producer sets orderId as a message key, we first need to invoke the selectKey method for both order.sell and orders.buy streams. In our case, joining buy and sell orders related to the same product is just a first step. Then we need to verify if the maximum price in the buy order is not greater than the minimum price in the sell order.

The next step is to verify if both these have not been realized previously, as they also may be paired with other orders in the stream. If all the conditions are met we may create a new transaction. Finally, we may change a stream key from productId to the transactionId and send it to the dedicated transactions topic.

Each time we successfully join two orders we are trying to create a transaction. The execute(...) method is called within the KStream join method. Firstly, we are comparing the prices of both orders. Then we verify the realization status of both orders by accessing the H2 database. If the orders are still not fully realized we may create a transaction and update orders records in the database.

private Transaction execute(Order orderBuy, Order orderSell) {
   if (orderBuy.getAmount() >= orderSell.getAmount()) {
      int count = Math.min(orderBuy.getProductCount(), 
                           orderSell.getProductCount());
      boolean allowed = logic
         .performUpdate(orderBuy.getId(), orderSell.getId(), count);
      if (!allowed)
         return null;
      else
         return new Transaction(
            ++transactionId,
            orderBuy.getId(),
            orderSell.getId(),
            count,
            (orderBuy.getAmount() + orderSell.getAmount()) / 2,
            LocalDateTime.now(),
            "NEW"
      );
   } else {
            return null;
   }
}

Let’s take a closer look at the performUpdate() method called inside the execute() method. It initiates a transaction and locks both Order entities. Then it verifies each order realization status and updates it with the current values if possible. Only if the performUpdate() method finishes successfully the stock-service application creates a new transaction.

@ApplicationScoped
public class OrderLogic {

    @Inject
    Logger log;
    @Inject
    OrderRepository repository;

    @Transactional
    public Order add(Order order) {
        repository.persist(order);
        return order;
    }

    @Transactional
    public boolean performUpdate(Long buyOrderId, Long sellOrderId, int amount) {
        Order buyOrder = repository.findById(buyOrderId, 
           LockModeType.PESSIMISTIC_WRITE);
        Order sellOrder = repository.findById(sellOrderId, 
           LockModeType.PESSIMISTIC_WRITE);
        if (buyOrder == null || sellOrder == null)
            return false;
        int buyAvailableCount = 
           buyOrder.getProductCount() - buyOrder.getRealizedCount();
        int sellAvailableCount = 
           sellOrder.getProductCount() - sellOrder.getRealizedCount();
        if (buyAvailableCount >= amount && sellAvailableCount >= amount) {
            buyOrder.setRealizedCount(buyOrder.getRealizedCount() + amount);
            sellOrder.setRealizedCount(sellOrder.getRealizedCount() + amount);
            repository.persist(buyOrder);
            repository.persist(sellOrder);
            return true;
        } else {
            return false;
        }
    }
}

Nice 🙂 That’s all that we need to do in the first part of our exercise. Now we can run both our sample applications.

Run and manage Kafka Streams application with Quarkus

As I mentioned before, we first need to start the order-service. It runs a new Kafka instance and creates all required topics. Immediately after startup, it is ready to send new orders. To run the Quarkus app locally just go to the order-service directory and execute the following command:

$ mvn quarkus:dev

Just to verify you can display a list running Docker containers with the docker ps command. Here’s my result:

As you see the instance of Redpanda is running and it is available on a random port 49724. Quarkus did it for us. However, if you have Redpanda installed on your laptop you check out the list of created topics with their CLI rpk:

$ rpk topic list --brokers=127.0.0.1:49724

Then let’s run the stock-service. Go to the stock-service directory and run mvn quarkus:dev once again. After startup, it just works. Both applications share the same instance thanks to the Quarkus Dev Services. Now let’s access the Quarkus Dev UI console available at http://localhost:8080/q/dev/. Find the tile with the “Apache Kafka Streams” title.

You can check a visualization of our Kafka Streams topology. I will divide the image into two parts for better visibility.

Use Kafka KTable with Quarkus

We have already finished the implementation of the logic responsible for creating transactions from incoming orders. In the next step, we are going to perform analytical operations on the transactions stream. Our main goal is to calculate total number of transactions, total number of products sold/bought, and total value of transactions (price * productsCount) per each product. Here’s the object class used in calculations.

@RegisterForReflection
public class TransactionTotal {
   private int count;
   private int amount;
   private int productCount;

   // GETTERS AND SETTERS
}

Because the Transaction object does not contain information about the product, we first need to join the order to access it. Then we produce a KTable by per productId grouping and aggregation. After that, we may invoke an aggregate method that allows us to perform some more complex calculations. In that particular case, we are calculating the number of all executed transactions, their volume of products, and total value. The result KTable can be materialized as the state store. Thanks to that we will be able to query it by the name defined by the TRANSACTIONS_PER_PRODUCT_SUMMARY variable.

KeyValueBytesStoreSupplier storePerProductSupplier = Stores.persistentKeyValueStore(
   TRANSACTIONS_PER_PRODUCT_SUMMARY);

builder.stream(TRANSACTIONS_TOPIC, Consumed.with(Serdes.Long(), transactionSerde))
   .selectKey((k, v) -> v.getSellOrderId())
   .join(orders.selectKey((k, v) -> v.getId()),
      (t, o) -> new TransactionWithProduct(t, o.getProductId()),
      JoinWindows.of(Duration.ofSeconds(10)),
      StreamJoined.with(Serdes.Long(), transactionSerde, orderSerde))
   .groupBy((k, v) -> v.getProductId(), Grouped.with(Serdes.Integer(), transactionWithProductSerde))
   .aggregate(
      TransactionTotal::new,
      (k, v, a) -> {
         a.setCount(a.getCount() + 1);
         a.setProductCount(a.getAmount() + v.getTransaction().getAmount());
         a.setAmount(a.getProductCount() +
            (v.getTransaction().getAmount() * v.getTransaction().getPrice()));
         return a;
      },
      Materialized.<Integer, TransactionTotal> as(storePerProductSupplier)
         .withKeySerde(Serdes.Integer())
         .withValueSerde(transactionTotalSerde))
   .toStream()
   .peek((k, v) -> log.infof("Total per product(%d): %s", k, v))
   .to(TRANSACTIONS_PER_PRODUCT_AGGREGATED_TOPIC, 
      Produced.with(Serdes.Integer(), transactionTotalSerde));

Here’s the class responsible for interactive queries implementation. It injects KafkaStreams bean. Then it tries to obtain persistent store basing on the StockService.TRANSACTIONS_PER_PRODUCT_SUMMARY variable. As a result there is a ReadOnlyKeyValueStore with Integer as a key, and TransactionTotal as a value. We may return a single value related with the particular productId (getTransactionsPerProductData) or just return a list with results for all available products (getAllTransactionsPerProductData).

@ApplicationScoped
public class InteractiveQueries {

   @Inject
   KafkaStreams streams;

   public TransactionTotal getTransactionsPerProductData(Integer productId) {
      return getTransactionsPerProductStore().get(productId);
   }

   public Map<Integer, TransactionTotal> getAllTransactionsPerProductData() {
      Map<Integer, TransactionTotal> m = new HashMap<>();
      KeyValueIterator<Integer, TransactionTotal> it = getTransactionsPerProductStore().all();
      while (it.hasNext()) {
         KeyValue<Integer, TransactionTotal> kv = it.next();
         m.put(kv.key, kv.value);
      }
      return m;
   }

   private ReadOnlyKeyValueStore<Integer, TransactionTotal> getTransactionsPerProductStore() {
      return streams.store(
         StoreQueryParameters
            .fromNameAndType(StockService.TRANSACTIONS_PER_PRODUCT_SUMMARY, QueryableStoreTypes.keyValueStore()));
   }

}

Finally, we can create a REST controller responsible for exposing data retrieved by the interactive queries.

@ApplicationScoped
@Path("/transactions")
public class TransactionResource {

    @Inject
    InteractiveQueries interactiveQueries;

    @GET
    @Path("/products/{id}")
    public TransactionTotal getByProductId(@PathParam("id") Integer productId) {
        return interactiveQueries.getTransactionsPerProductData(productId);
    }

    @GET
    @Path("/products")
    public Map<Integer, TransactionTotal> getAllPerProductId() {
        return interactiveQueries.getAllTransactionsPerProductData();
    }

}

Now you can easily check out statistics related to the transactions created by the stock-service. You just need to call the following REST endpoints e.g.:

$ curl http://localhost:8080/transactions/products
$ curl http://localhost:8080/transactions/products/3
$ curl http://localhost:8080/transactions/products/5

Final Thoughts

Quarkus simplifies working with Kafka Streams and interactive queries. It provides useful improvements for developers like auto-start of Kafka in dev and test modes or Kafka streams visualization in dev UI console. You can easily compare the Quarkus approach with the Spring Cloud Stream Kafka support since I implemented the same logic for both those frameworks. Here’s the GitHub repository with Spring Cloud Stream Kafka Streams example.

The post Kafka Streams with Quarkus appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/11/24/kafka-streams-with-quarkus/feed/ 4 10234
Kafka Streams with Spring Cloud Stream https://piotrminkowski.com/2021/11/11/kafka-streams-with-spring-cloud-stream/ https://piotrminkowski.com/2021/11/11/kafka-streams-with-spring-cloud-stream/#comments Thu, 11 Nov 2021 10:07:45 +0000 https://piotrminkowski.com/?p=10193 In this article, you will learn how to use Kafka Streams with Spring Cloud Stream. We will build a simple Spring Boot application that simulates the stock market. Based on that example, I’ll try to explain what a streaming platform is and how it differs from a traditional message broker. If you are looking for […]

The post Kafka Streams with Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams with Spring Cloud Stream. We will build a simple Spring Boot application that simulates the stock market. Based on that example, I’ll try to explain what a streaming platform is and how it differs from a traditional message broker. If you are looking for an intro to the Spring Cloud Stream project you should read my article about it. It describes how to use Spring Cloud Stream with RabbitMQ in order to build event-driven microservices.

In Spring Cloud Stream there are two binders supporting the Kafka platform. We will focus on the second of them – Apache Kafka Streams Binder. You can read more about it in Spring Cloud documentation available here.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Introduction

There are three major types in Kafka Streams – KStreamKTable and GlobalKTable. Spring Cloud Stream supports all of them. We can easily convert the stream to the table and vice-versa. To clarify, all Kafka topics are stored as a stream. The difference is: when we want to consume that topic, we can either consume it as a table or a stream. KTable takes a stream of records from a topic and reduces it down to unique entries using a key of each message.

Architecture

KStream represents an immutable stream of data where each new record is treated as INSERT. In our case, there are two incoming streams of events. Both of them represent incoming orders. These orders are generated by the order-service application. It sends buy orders to the orders.buy topic and sell orders to the orders.sell topic. The stock-service application receives and handles events from those topics. In the first step, it needs to change the key of each message from the orderId to the productId. That’s because it has to join orders from different topics related to the same product in order to execute transactions. The final transaction price is an average of sell and buy order price.

kafka-streams-spring-cloud-concept

We are building a very simplified version of the stock market platform. Each buy order contains a maximum price at which a customer is expecting to buy a product. On the other hand, each sell order contains a minimum price a customer is ready to sell his product. If the sell order price is not greater than a buy order price for a particular product we may perform a transaction.

Each order is valid for 10 seconds. After that time the stock-service application will not handle such an order since it is considered as expired. Each order an amount of product for a transaction. For example, we may sell 100 for 10 or buy 200 for 11. Therefore, an order may be fully or partially realized. The stock-service application tries to join partially realized orders to other new or partially realized orders. You can see the visualization of that process in the picture below.

kafka-streams-spring-cloud-arch

Run Apache Kafka locally

Before we jump to the implementation, we need to run a local instance of Apache Kafka. If you don’t want to install it on your laptop, the best way to run it is through Redpanda. Redpanda is a Kafka API compatible streaming platform. In comparison to Kafka, it is relatively easy to run it locally. You just need to have Docker installed. Once you installed Redpanda on your machine you need to create a cluster. Since you don’t need a large cluster during development, you can create a single-node instance using the following command:

$ rpk container start

After running, it will print the address of your node. For me, it is 127.0.0.1:50842. So, now I can display a list of created topics using the following command:

$ rpk topic list --brokers 127.0.0.1:50842

Currently, there are no topics created. We don’t need to do anything manually. Spring Cloud Stream automatically creates missing topics on the application startup. In case, you would like to remove the Redpanda instance after our exercise, you just need to run the following command:

$ rpk container purge

Perfectly! Our local instance of Kafka is running. After that, we may proceed to the development.

Send events to Kafka with Spring Cloud Stream

In order to generate and send events continuously with Spring Cloud Stream Kafka, we need to define a Supplier bean. In our case, the order-service application generates test data. Each message contains a key and a payload that is serialized to JSON. The message key is the order’s id. We have two Supplier beans since we are sending messages to the two topics. Here’s the Order event class:

@Getter
@Setter
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class Order {
   private Long id;
   private Integer customerId;
   private Integer productId;
   private int productCount;
   @JsonDeserialize(using = LocalDateTimeDeserializer.class)
   @JsonSerialize(using = LocalDateTimeSerializer.class)
   private LocalDateTime creationDate;
   private OrderType type;
   private int amount;
}

Our application uses Lombok and Jackson for messages serialization. Of course, we also need to include Spring Cloud Stream Kafka Binder. Opposite to the consumer side, the producer does not use Kafka Streams, because it is just generating and sending events.

<dependencies>
  <dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
  </dependency>
</dependencies>

We have a predefined list of orders just to test our solution. We use MessageBuilder to build a message that contains the header kafka_messageKey and the Order payload.

@SpringBootApplication
@Slf4j
public class OrderService {

   private static long orderId = 0;
   private static final Random r = new Random();

   LinkedList<Order> buyOrders = new LinkedList<>(List.of(
      new Order(++orderId, 1, 1, 100, LocalDateTime.now(), OrderType.BUY, 1000),
      new Order(++orderId, 2, 1, 200, LocalDateTime.now(), OrderType.BUY, 1050),
      new Order(++orderId, 3, 1, 100, LocalDateTime.now(), OrderType.BUY, 1030),
      new Order(++orderId, 4, 1, 200, LocalDateTime.now(), OrderType.BUY, 1050),
      new Order(++orderId, 5, 1, 200, LocalDateTime.now(), OrderType.BUY, 1000),
      new Order(++orderId, 11, 1, 100, LocalDateTime.now(), OrderType.BUY, 1050)
   ));

   LinkedList<Order> sellOrders = new LinkedList<>(List.of(
      new Order(++orderId, 6, 1, 200, LocalDateTime.now(), OrderType.SELL, 950),
      new Order(++orderId, 7, 1, 100, LocalDateTime.now(), OrderType.SELL, 1000),
      new Order(++orderId, 8, 1, 100, LocalDateTime.now(), OrderType.SELL, 1050),
      new Order(++orderId, 9, 1, 300, LocalDateTime.now(), OrderType.SELL, 1000),
      new Order(++orderId, 10, 1, 200, LocalDateTime.now(), OrderType.SELL, 1020)
   ));

   public static void main(String[] args) {
      SpringApplication.run(OrderService.class, args);
   }

   @Bean
   public Supplier<Message<Order>> orderBuySupplier() {
      return () -> {
         if (buyOrders.peek() != null) {
            Message<Order> o = MessageBuilder
                  .withPayload(buyOrders.peek())
                  .setHeader(KafkaHeaders.MESSAGE_KEY, Objects.requireNonNull(buyOrders.poll()).getId())
                  .build();
            log.info("Order: {}", o.getPayload());
            return o;
         } else {
            return null;
         }
      };
   }

   @Bean
   public Supplier<Message<Order>> orderSellSupplier() {
      return () -> {
         if (sellOrders.peek() != null) {
            Message<Order> o = MessageBuilder
                  .withPayload(sellOrders.peek())
                  .setHeader(KafkaHeaders.MESSAGE_KEY, Objects.requireNonNull(sellOrders.poll()).getId())
                  .build();
            log.info("Order: {}", o.getPayload());
            return o;
         } else {
            return null;
         }
      };
   }

}

After that, we need to provide some configuration settings inside the application.yml file. Since we use multiple binding beans (in our case Supplier beans) we have to define the property spring.cloud.stream.function.definition that contains a list of bindable functions. We need to pass the Supplier method names divided by a semicolon. In the next few lines, we are setting the name of the target topics on Kafka and the message key serializer. Of course, we also need to set the address of the Kafka broker.

spring.kafka.bootstrap-servers: ${KAFKA_URL}

spring.cloud.stream.function.definition: orderBuySupplier;orderSellSupplier

spring.cloud.stream.bindings.orderBuySupplier-out-0.destination: orders.buy
spring.cloud.stream.kafka.bindings.orderBuySupplier-out-0.producer.configuration.key.serializer: org.apache.kafka.common.serialization.LongSerializer

spring.cloud.stream.bindings.orderSellSupplier-out-0.destination: orders.sell
spring.cloud.stream.kafka.bindings.orderSellSupplier-out-0.producer.configuration.key.serializer: org.apache.kafka.common.serialization.LongSerializer

Before running the application I need to create an environment variable containing the address of the Kafka broker.

$ export KAFKA_URL=127.0.0.1:50842

Then, let’s run our Spring Cloud application using the following Maven command:

$ mvn clean spring-boot:run

Once you did that, it sent some test orders for the same product (productId=1) as shown below.

We can also verify a list of topics on our local Kafka instance. Both of them have been automatically created by the Spring Cloud Stream Kafka binder before sending messages.

Consume Kafka Streams with Spring Cloud Stream

Now, we are going to switch to the stock-service implementation. In order to process streams of events, we need to include the Spring Cloud Stream Kafka Streams binder. Also, our application would have an ORM layer for storing data, so we have to include the Spring Data JPA starter and the H2 database.

<dependencies>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-jpa</artifactId>
  </dependency>
  <dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
  </dependency>
</dependencies>

In the first step, we are going to merge both streams of orders (buy and sell), insert the Order into the database, and print the event message. You could ask – why I use the database and ORM layer here since I have Kafka KTable? Well, I need transactions with lock support in order to coordinate the status of order realization (refer to the description in the introduction – fully and partially realized orders). I will give you more details about it in the next sections.

In order to process streams, we need to declare a functional bean that takes KStream as an input parameter. If there are two sources, we have to use BiConsumer (just for consumption) or BiFunction (to consume and send events to the new target stream) beans. In that case, we are not creating a new stream of events, so we can use BiConsumer.

@Autowired
OrderLogic logic;

@Bean
public BiConsumer<KStream<Long, Order>, KStream<Long, Order>> orders() {
   return (orderBuy, orderSell) -> orderBuy
         .merge(orderSell)
         .peek((k, v) -> {
            log.info("New({}): {}", k, v);
            logic.add(v);
         });
}

After that, we need to add some configuration settings. There are two input topics, so we need to map their names. Also, if we have more than one functional bean we need to set applicationId related to the particular function. For now, it is not required, since we have only a single function. But later, we are going to add other functions for some advanced operations.

spring.cloud.stream.bindings.orders-in-0.destination: orders.buy
spring.cloud.stream.bindings.orders-in-1.destination: orders.sell
spring.cloud.stream.kafka.streams.binder.functions.orders.applicationId: orders

For now, that’s all. You can now run the instance of stock-service using the Maven command mvn spring-boot:run.

Operations on Kafka Streams

Now, we may use some more advanced operations on Kafka Streams than just merging two different streams. In fact, that’s a key logic in our application. We need to join two different order streams into a single one using the productId as a joining key. Since the producer sets orderId as a message key, we first need to invoke the selectKey method for both order.sell and orders.buy streams. In our case, joining buy and sell orders related to the same product is just a first step. Then we need to verify if the maximum price in the buy order is not greater than the minimum price in the sell order.

The next step is to verify if both these have not been realized previously, as they also may be paired with other orders in the stream. If all the conditions are met we may create a new transaction. Finally, we may change a stream key from productId to the transactionId and send it to the dedicated transactions topic.

In order to implement the scenario described above, we need to define the BiFunction bean. It takes two input KStream from orders.buy and orders.sell and creates a new KStream of transaction events sent to the output transactions topic. While joining streams it uses 10 seconds sliding window and invokes the execute method for creating a new transaction.

@Bean
public BiFunction<KStream<Long, Order>, KStream<Long, Order>, KStream<Long, Transaction>> transactions() {
   return (orderBuy, orderSell) -> orderBuy
         .selectKey((k, v) -> v.getProductId())
         .join(orderSell.selectKey((k, v) -> v.getProductId()),
               this::execute,
               JoinWindows.of(Duration.ofSeconds(10)),
               StreamJoined.with(Serdes.Integer(), 
                                 new JsonSerde<>(Order.class), 
                                 new JsonSerde<>(Order.class)))
         .filterNot((k, v) -> v == null)
         .map((k, v) -> new KeyValue<>(v.getId(), v))
         .peek((k, v) -> log.info("Done -> {}", v));
}

private Transaction execute(Order orderBuy, Order orderSell) {
   if (orderBuy.getAmount() >= orderSell.getAmount()) {
      int count = Math.min(orderBuy.getProductCount(), orderSell.getProductCount());
      boolean allowed = logic.performUpdate(orderBuy.getId(), orderSell.getId(), count);
      if (!allowed)
         return null;
      else
         return new Transaction(
            ++transactionId,
            orderBuy.getId(),
            orderSell.getId(),
            Math.min(orderBuy.getProductCount(), orderSell.getProductCount()),
            (orderBuy.getAmount() + orderSell.getAmount()) / 2,
            LocalDateTime.now(),
            "NEW");
   } else {
      return null;
   }
}

Let’s take a closer look at the performUpdate() method called inside the execute() method. It initiates a transaction and locks both Order entities. Then it verifies each order realization status and updates it with the current values if possible. Only if the performUpdate() method finishes successfully the stock-service application creates a new transaction.

@Service
public class OrderLogic {

   private OrderRepository repository;

   public OrderLogic(OrderRepository repository) {
      this.repository = repository;
   }

   public Order add(Order order) {
      return repository.save(order);
   }

   @Transactional
   public boolean performUpdate(Long buyOrderId, Long sellOrderId, int amount) {
      Order buyOrder = repository.findById(buyOrderId).orElseThrow();
      Order sellOrder = repository.findById(sellOrderId).orElseThrow();
      int buyAvailableCount = buyOrder.getProductCount() - buyOrder.getRealizedCount();
      int sellAvailableCount = sellOrder.getProductCount() - sellOrder.getRealizedCount();
      if (buyAvailableCount >= amount && sellAvailableCount >= amount) {
         buyOrder.setRealizedCount(buyOrder.getRealizedCount() + amount);
         sellOrder.setRealizedCount(sellOrder.getRealizedCount() + amount);
         repository.save(buyOrder);
         repository.save(sellOrder);
         return true;
      } else {
         return false;
      }
   }
}

Here’s our repository class with the findById method. It sets a pessimistic lock on the Order entity during the transaction.

public interface OrderRepository extends CrudRepository<Order, Long> {

  @Lock(LockModeType.PESSIMISTIC_WRITE)
  Optional<Order> findById(Long id);

}

We also need to provide configuration settings for the transaction BiFunction.

spring.cloud.stream.bindings.transactions-in-0.destination: orders.buy
spring.cloud.stream.bindings.transactions-in-1.destination: orders.sell
spring.cloud.stream.bindings.transactions-out-0.destination: transactions
spring.cloud.stream.kafka.streams.binder.functions.transactions.applicationId: transactions

spring.cloud.stream.function.definition: orders;transactions

Use Kafka KTable with Spring Cloud Stream

We have already finished the implementation of the logic responsible for creating transactions from incoming orders. Now, we would like to examine data generated by our stock-service application. The most important things are how many transactions were generated, what was the volume of transactions globally, and per product. Three key statistics related to our transactions are: the number of transactions, the number of products sell/buy during transactions, and the total amount of transactions (price * productsCount). Here’s the definition of our object used for counting aggregations.

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class TransactionTotal {
   private int count;
   private int productCount;
   private int amount;
}

In order to call an aggregation method, we first need to group orders stream by the selected key. In the method visible below we use the status field as a grouping key. After that, we may invoke an aggregate method that allows us to perform some more complex calculations. In that particular case, we are calculating the number of all executed transactions, their volume of products, and total amount. The result KTable can be materialized as the state store. Thanks to that we will be able to query it by the name all-transactions-store.

@Bean
public Consumer<KStream<Long, Transaction>> total() {
   KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
                "all-transactions-store");
   return transactions -> transactions
         .groupBy((k, v) -> v.getStatus(), 
                  Grouped.with(Serdes.String(), new JsonSerde<>(Transaction.class)))
         .aggregate(
                 TransactionTotal::new,
                 (k, v, a) -> {
                    a.setCount(a.getCount() + 1);
                    a.setProductCount(a.getProductCount() + v.getAmount());
                    a.setAmount(a.getAmount() + (v.getPrice() * v.getAmount()));
                    return a;
                 },
                 Materialized.<String, TransactionTotal> as(storeSupplier)
                    .withKeySerde(Serdes.String())
                    .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
         .toStream()
         .peek((k, v) -> log.info("Total: {}", v));
}

The next function performs a similar aggregate operation, but this time per each product. Because the Transaction object does not contain information about the product, we first need to join the order to access it. Then we produce a KTable by per productId grouping and aggregation. The same as before we are materializing aggregation as a state store.

@Bean
public BiConsumer<KStream<Long, Transaction>, KStream<Long, Order>> totalPerProduct() {
   KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
                "transactions-per-product-store");
   return (transactions, orders) -> transactions
         .selectKey((k, v) -> v.getSellOrderId())
         .join(orders.selectKey((k, v) -> v.getId()),
               (t, o) -> new TransactionTotalWithProduct(t, o.getProductId()),
               JoinWindows.of(Duration.ofSeconds(10)),
               StreamJoined.with(Serdes.Long(), 
                  new JsonSerde<>(Transaction.class), 
                  new JsonSerde<>(Order.class)))
         .groupBy((k, v) -> v.getProductId(), 
            Grouped.with(Serdes.Integer(), new JsonSerde<>(TransactionTotalWithProduct.class)))
         .aggregate(
               TransactionTotal::new,
               (k, v, a) -> {
                  a.setCount(a.getCount() + 1);
                  a.setProductCount(a.getProductCount() + v.getTransaction().getAmount());
                  a.setAmount(a.getAmount() + (v.getTransaction().getPrice() * v.getTransaction().getAmount()));
                  return a;
               },
               Materialized.<Integer, TransactionTotal> as(storeSupplier)
                  .withKeySerde(Serdes.Integer())
                  .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
         .toStream()
         .peek((k, v) -> log.info("Total per product({}): {}", k, v));
}

What if we would like to perform similar aggregations to described above, but only for a particular period of time? We need to invoke the windowedBy method and produce a dedicated state store for such operations.

@Bean
public BiConsumer<KStream<Long, Transaction>, KStream<Long, Order>> latestPerProduct() {
   WindowBytesStoreSupplier storeSupplier = Stores.persistentWindowStore(
      "latest-transactions-per-product-store", Duration.ofSeconds(30), Duration.ofSeconds(30), false);
   return (transactions, orders) -> transactions
      .selectKey((k, v) -> v.getSellOrderId())
      .join(orders.selectKey((k, v) -> v.getId()),
            (t, o) -> new TransactionTotalWithProduct(t, o.getProductId()),
            JoinWindows.of(Duration.ofSeconds(10)),
            StreamJoined.with(Serdes.Long(), new JsonSerde<>(Transaction.class), new JsonSerde<>(Order.class)))
      .groupBy((k, v) -> v.getProductId(), Grouped.with(Serdes.Integer(), new JsonSerde<>(TransactionTotalWithProduct.class)))
      .windowedBy(TimeWindows.of(Duration.ofSeconds(30)))
      .aggregate(
            TransactionTotal::new,
            (k, v, a) -> {
               a.setCount(a.getCount() + 1);
               a.setAmount(a.getAmount() + v.getTransaction().getAmount());
               return a;
            },
            Materialized.<Integer, TransactionTotal> as(storeSupplier)
               .withKeySerde(Serdes.Integer())
               .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
      .toStream()
      .peek((k, v) -> log.info("Total per product last 30s({}): {}", k, v));
}

Interactive queries

We have already created and configured all required Kafka Streams with Spring Cloud. Finally, we can execute queries on state stores. This operation is called an interactive query. Let’s create a REST controller for exposing such endpoints with the results. In order to query Kafka Streams state stores with Spring Cloud, we need to inject the InteractiveQueryService bean into the controller.

@RestController
@RequestMapping("/transactions")
public class TransactionController {

   private InteractiveQueryService queryService;

   public TransactionController(InteractiveQueryService queryService) {
      this.queryService = queryService;
   }

   @GetMapping("/all")
   public TransactionTotal getAllTransactionsSummary() {
      ReadOnlyKeyValueStore<String, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("all-transactions-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get("NEW");
   }

   @GetMapping("/product/{productId}")
   public TransactionTotal getSummaryByProductId(@PathVariable("productId") Integer productId) {
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get(productId);
   }

   @GetMapping("/product/latest/{productId}")
   public TransactionTotal getLatestSummaryByProductId(@PathVariable("productId") Integer productId) {
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("latest-transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get(productId);
   }

   @GetMapping("/product")
   public Map<Integer, TransactionTotal> getSummaryByAllProducts() {
      Map<Integer, TransactionTotal> m = new HashMap<>();
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      KeyValueIterator<Integer, TransactionTotal> it = keyValueStore.all();
      while (it.hasNext()) {
         KeyValue<Integer, TransactionTotal> kv = it.next();
         m.put(kv.key, kv.value);
      }
      return m;
   }

}

Before you run the latest version of the stock-service application you should generate more differentiated random data. Let’s say we would like to generate orders for 5 different products with floating prices as shown below. Just uncomment the following fragment of code in the order-service and run the application once again to generate an infinitive stream of events.

private static long orderId = 0;
private static final Random r = new Random();

private Map<Integer, Integer> prices = Map.of(
      1, 1000, 
      2, 2000, 
      3, 5000, 
      4, 1500, 
      5, 2500);

@Bean
public Supplier<Message<Order>> orderBuySupplier() {
   return () -> {
      Integer productId = r.nextInt(1, 6);
      int price = prices.get(productId) + r.nextInt(-100, 100);
      Order o = new Order(
         ++orderId,
         r.nextInt(1, 6),
         productId,
         100,
         LocalDateTime.now(),
         OrderType.BUY,
         price);
      log.info("Order: {}", o);
      return MessageBuilder
         .withPayload(o)
         .setHeader(KafkaHeaders.MESSAGE_KEY, orderId)
         .build();
   };
}

You may also want to generate more messages. To do that you need to decrease timeout for Spring Cloud Stream Kafka Supplier.

spring.cloud.stream.poller.fixedDelay: 100

After running both our sample applications you may verify the logs on the stock-service side.

Then you may call our REST endpoints performing interactive queries on the materialized Kafka KTable.

$ curl http://localhost:8080/transactions/all
$ curl http://localhost:8080/transactions/product/3
$ curl http://localhost:8080/transactions/product/latest/5

It looks simple? Well, under the hood it may look quite more complicated 🙂 Here’s a final list of topics automatically created to the needs of our application.

kafka-streams-spring-cloud-topics

Final Thoughts

Spring Cloud Stream simplifies working with Kafka Streams and interactive queries. Kafka Streams by itself is a very powerful mechanism. In this article, I showed you how we can use it to implement not very trivial logic and then analyze data in various ways.

The post Kafka Streams with Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/11/11/kafka-streams-with-spring-cloud-stream/feed/ 21 10193