kafka streams Archives - Piotr's TechBlog https://piotrminkowski.com/tag/kafka-streams/ Java, Spring, Kotlin, microservices, Kubernetes, containers Tue, 08 Feb 2022 09:42:31 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 kafka streams Archives - Piotr's TechBlog https://piotrminkowski.com/tag/kafka-streams/ 32 32 181738725 Deep Dive into Saga Transactions with Kafka Streams and Spring Boot https://piotrminkowski.com/2022/02/07/deep-dive-into-saga-transactions-with-kafka-streams-and-spring-boot/ https://piotrminkowski.com/2022/02/07/deep-dive-into-saga-transactions-with-kafka-streams-and-spring-boot/#comments Mon, 07 Feb 2022 14:38:20 +0000 https://piotrminkowski.com/?p=10587 In this article, you will learn how to use Kafka Streams and Spring Boot to perform transactions according to the Saga pattern. To be honest, I was quite surprised by a great deal of attention to my last article about Kafka. I got some questions about streams, transactions, and support for Kafka in Spring Boot. […]

The post Deep Dive into Saga Transactions with Kafka Streams and Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams and Spring Boot to perform transactions according to the Saga pattern. To be honest, I was quite surprised by a great deal of attention to my last article about Kafka. I got some questions about streams, transactions, and support for Kafka in Spring Boot. In this article, I’ll try to answer a few of them. I will also show how you can easily set up a cloud-managed Kafka on the Upstash.

Introduction

First of all, let’s recap the approach described in the previous article. We used Kafka Streams to process order transactions on the order-service side. To handle orders coming to the stock-service and payment-service we used a standard Spring @KafkaListener. There are also two databases – a single database per every service. The stock-service stores data related to the number of available products and updates them after receiving an order. The same with the payment-service. It updates the customer’s account on every single order. Both applications receive orders from Kafka topic. They send responses to other topics. But just to simplify, we will skip it as shown in the figure below. We treat the Kafka orders topic as a stream of events and also as a table with the latest order’s status.

kafka-streams-transactions-old-arch

What may go wrong with that approach? In fact, we have two data sources here. We use Kafka as the order store. On the other hand, there are SQL databases (in my case H2, but you can use any other) that store stock and payment data. Once we send an order with a reservation to the Kafka topic, we need to update a database. Since Kafka does not support XA transactions, it may result in data inconsistency. Of course, Kafka doesn’t support XA transactions the same as many other systems including e.g. RabbitMQ.

The question is what can we do with that? One of the possible options you may use is an approach called Change Data Capture (CDC) with the outbox pattern. CDC identifies and tracks changes to data in a database. Then it may emit those changes as events and send them, for example to the Kafka topic. I won’t go into the details of that process. If you are interested in you may read this article written by Gunnar Morling.

Architecture with Kafka Streams

The approach I will describe today is fully based on the Kafka Streams. We won’t use any SQL databases. When the order-service sends a new order its id is the message key. With Kafka Streams, we may change a message key in the stream. It results in creating new topics and repartitioning. With new message keys, we may perform calculations just for the specific customerId or productId. The result of such calculation may be saved in the persistent store. For example, Kafka automatically creates and manages such state stores when you are calling stateful operations like count() or aggregate(). We will aggregate the orders related to the particular customer or product. Here’s the illustration of our architecture. Here’s the visualization of our process.

kafka-streams-transactions-arch

Now, let’s consider a scenario for the payment-service in details. In the incoming stream of orders the payment-service calls the selectKey() operation. It changes the key from the order’s id into the order’s customerId. Then it groups all the orders by the new key and invokes the aggregate() operation. In the aggregate() method it calculates the available amount and reserved amount based on the order’s price and status (whether it is a new order or a confirmation order). If there are sufficient funds on the customer account it sends the ACCEPT order to the payment-orders topic. Otherwise, it sends the REJECT order. Then the order-service process responses by joining streams from payment-orders and stock-orders by the order’s id. As the result, it sends a confirmation or a rollback order.

kafka-streams-transactions-details

Finally, let’s proceed to the implementation!

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then switch to the streams-full branch. After that, you should just follow my instructions.

Aggregation with Kafka Streams

Let’s begin with the payment-service. The implementation of KStream in not complicated here. In the first step (1), we invoke the selectKey() method and get the customerId value of the Order object as a new key. Then we call groupByKey() method (2) to receive KGroupedStream as a result. While we have KGroupedStream we may invoke one of the calculation methods. In that case, we need to use aggregate(), since we have a little bit more advanced calculation than just a simple count (3). The last two steps are just for printing the value after calculation.

@Bean
public KStream<Long, Order> stream(StreamsBuilder builder) {
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   JsonSerde<Reservation> rsvSerde = new JsonSerde<>(Reservation.class);
   KStream<Long, Order> stream = builder
      .stream("orders", Consumed.with(Serdes.Long(), orderSerde))
      .peek((k, order) -> LOG.info("New: {}", order));

   KeyValueBytesStoreSupplier customerOrderStoreSupplier =
      Stores.persistentKeyValueStore("customer-orders");

   stream.selectKey((k, v) -> v.getCustomerId()) // (1)
      .groupByKey(Grouped.with(Serdes.Long(), orderSerde)) // (2)
      .aggregate(
         () -> new Reservation(random.nextInt(1000)),
         aggregatorService,
         Materialized.<Long, Reservation>as(customerOrderStoreSupplier)
            .withKeySerde(Serdes.Long())
            .withValueSerde(rsvSerde)) // (3)
      .toStream()
      .peek((k, trx) -> LOG.info("Commit: {}", trx));

   return stream;
}

However, the most important step in the fragment of code visible above is the class called inside the aggregate() method. The aggregate() method takes three input arguments. The first of them indicates the starting value of our compute object. That object represents the current state of the customer’s account. It has two fields: amountAvailable and amountReserved. To clarify, we use that object instead of the entity that stores available and reserved amounts on the customer account. Each customer is represented by the customerId (key) and the Reservation object (value) in Kafka KTable. Just for the test purpose, we are generating the starting value of amountAvailable as a random number between 0 and 1000.

public class Reservation {
   private int amountAvailable;
   private int amountReserved;

   public Reservation() {
   
   }

   public Reservation(int amountAvailable) {
      this.amountAvailable = amountAvailable;
   }

   // GETTERS AND SETTERS ...

}

Ok, let’s take a look at our aggregation method. It needs to implement the Kafka Aggregate interface and its method apply(). It may handle three types of orders. One of them is a confirmation of the order (1). It confirms the distributed transaction, so we just need to cancel a reservation by subtracting the order’s price from the amountReserved field. On the other, in the case of rollback, we need to increase the value of amountAvailable by the order’s price and decrease the value amountRerserved accordingly (2). Finally, if we receive a new order we need to perform a reservation if there are sufficient funds on the customer account, or otherwise, reject an order.

Aggregator<Long, Order, Reservation> aggregatorService = (id, order, rsv) -> {
   switch (order.getStatus()) {
      case "CONFIRMED" -> // (1)
         rsv.setAmountReserved(rsv.getAmountReserved() 
               - order.getPrice());
      case "ROLLBACK" -> { // (2)
         if (!order.getSource().equals("PAYMENT")) {
            rsv.setAmountAvailable(rsv.getAmountAvailable() 
                  + order.getPrice());
            rsv.setAmountReserved(rsv.getAmountReserved() 
                  - order.getPrice());
         }
      }
      case "NEW" -> { // (3)
         if (order.getPrice() <= rsv.getAmountAvailable()) {
            rsv.setAmountAvailable(rsv.getAmountAvailable() 
                  - order.getPrice());
            rsv.setAmountReserved(rsv.getAmountReserved() 
                  + order.getPrice());
            order.setStatus("ACCEPT");
         } else {
            order.setStatus("REJECT");
         }
         template.send("payment-orders", order.getId(), order);
      }
   }
   LOG.info("{}", rsv);
   return rsv;
};

State Store with the Kafka Streams Table

The implementation of the stock-service is pretty similar to the payment-service. With the difference that we count a number of available products on stock instead of available funds on the customer account. Here’s our Reservation object:

public class Reservation {
   private int itemsAvailable;
   private int itemsReserved;

   public Reservation() {
    
   }

   public Reservation(int itemsAvailable) {
      this.itemsAvailable = itemsAvailable;
   }

   // GETTERS AND SETTERS ...

}

The implementation of the aggregation method is also very similar to the payment-service. However, this time, let’s focus on another thing. Once we process a new order we need to send a response to the stock-orders topic. We use KafkaTemplate for that. In the case of payment-service we also send a response, but to the payment-orders topic. The send method from the KafkaTemplate does not block the thread. It returns the ListenableFuture objects. We may add a callback to the send method using it and the result after sending the message (1). Finally, let’s log the current state of the Reservation object (2).

Aggregator<Long, Order, Reservation> aggrSrv = (id, order, rsv) -> {
   switch (order.getStatus()) {
      case "CONFIRMED" -> rsv.setItemsReserved(rsv.getItemsReserved() 
            - order.getProductCount());
      case "ROLLBACK" -> {
         if (!order.getSource().equals("STOCK")) {
            rsv.setItemsAvailable(rsv.getItemsAvailable() 
                  + order.getProductCount());
            rsv.setItemsReserved(rsv.getItemsReserved() 
                  - order.getProductCount());
         }
      }
      case "NEW" -> {
         if (order.getProductCount() <= rsv.getItemsAvailable()) {
            rsv.setItemsAvailable(rsv.getItemsAvailable() 
                  - order.getProductCount());
            rsv.setItemsReserved(rsv.getItemsReserved() 
                  + order.getProductCount());
            order.setStatus("ACCEPT");
         } else {
            order.setStatus("REJECT");
         }
         // (1)
         template.send("stock-orders", order.getId(), order)
            .addCallback(r -> LOG.info("Sent: {}", 
               result != null ? result.getProducerRecord().value() : null),
               ex -> {});
      }
   }
   LOG.info("{}", rsv); // (2)
   return rsv;
};

After that, we are also logging the value of the Reservation object (1). In order to do that we need to convert KTable into KStream and then call the peek method. This log is printed just after Kafka Streams commits the offset in the source topic.

@Bean
public KStream<Long, Order> stream(StreamsBuilder builder) {
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   JsonSerde<Reservation> rsvSerde = new JsonSerde<>(Reservation.class);
   KStream<Long, Order> stream = builder
      .stream("orders", Consumed.with(Serdes.Long(), orderSerde))
      .peek((k, order) -> LOG.info("New: {}", order));

   KeyValueBytesStoreSupplier stockOrderStoreSupplier =
      Stores.persistentKeyValueStore("stock-orders");

   stream.selectKey((k, v) -> v.getProductId())
      .groupByKey(Grouped.with(Serdes.Long(), orderSerde))
      .aggregate(() -> new Reservation(random.nextInt(100)), aggrSrv,
         Materialized.<Long, Reservation>as(stockOrderStoreSupplier)
            .withKeySerde(Serdes.Long())
            .withValueSerde(rsvSerde))
      .toStream()
      .peek((k, trx) -> LOG.info("Commit: {}", trx)); // (1)

   return stream;
}

What will happen if you send the test order? Let’s see the logs. You can see the difference in time between processing the message and offset commit. You won’t have any problems with that until your application is running or it has been stopped gracefully. But if you, for example, kill the process using the kill -9 command? After restart, our application will receive the same messages once again. Since we use KafkaTemplate to send the response to the stock-orders topic, we need to commit the offset as soon as possible.

What can we do to avoid such problems? We may override the default value (30000) of the commit.interval.ms Kafka Streams property. If you set it to 0, it commits immediately after processing finishes.

spring.kafka:  
  streams:
    properties:
      commit.interval.ms: 0

On the other hand, we can also set the property processing.guarantee to exactly_once. It also changes the default value of commit.interval.ms to 100ms and enables idempotence for a producer. You can read more about it here in Kafka documentation.

spring.kafka:  
  streams:
    properties:
      processing.guarantee: exactly_once

Running Kafka on Upstash

For the purpose of today’s exercise, we will use a serverless Kafka cluster on Upstash. You can create it with a single click. If you would like to test JAAS authentication for your application I’ve got good news 🙂 The authentication on that cluster is enabled by default. You can find and copy username and password from the cluster’s main panel.

kafka-streams-transactions-upstash

Now, let’s configure Kafka connection settings and credentials for the Spring Boot application. There is a developer free tier on Upstash up to 10k messages per day. It will be enough for our tests.

spring.kafka:
  bootstrap-servers: topical-gar-11460-us1-kafka.upstash.io:9092
  properties:
    security.protocol: SASL_SSL
    sasl.mechanism: SCRAM-SHA-256
    sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${USERNAME}" password="${PASSWORD}";

With Upstash you can easily display a list of topics. In total, there are 10 topics used in our sample system. Three of them are used directly by the Spring Boot applications, while the rest of them by the Kafka Streams in order to process stateful operations.

After starting the order-service application we can call its REST endpoint to create and send an order to the Kafka topic.

private static final Logger LOG = 
   LoggerFactory.getLogger(OrderController.class);
private AtomicLong id = new AtomicLong();
private KafkaTemplate<Long, Order> template;

@PostMapping
public Order create(@RequestBody Order order) {
   order.setId(id.incrementAndGet());
   template.send("orders", order.getId(), order);
   LOG.info("Sent: {}", order);
   return order;
}

Let’s call the endpoint using the following curl command. You can use any customerId or productId you want.

$ curl -X 'POST' \
  'http://localhost:8080/orders' \
  -H 'Content-Type: application/json' \
  -d '{
    "customerId": 20,
    "productId": 20,
    "productCount": 2,
    "price": 10,
    "status": "NEW"
  }'

All three sample applications use Kafka Streams to process distributed transactions. Once the order is accepted by both stock-service and payment-service you should see the following entry in the order-service logs.

You can easily simulate rejection of transactions with Kafka Streams just by setting e.g. productCount higher than the value generated by the product-service as available items.

With Upstash UI you can also easily verify the number of messages incoming to the topics. Let’s see the current statistics for the orders topic.

Final Thoughts

In order to fully understand what happens in this example, you should be also familiar with the Kafka Streams threading model. It is worth reading the following article, which explains it in a clean manner. First of all, each stream partition is a totally ordered sequence of data records and maps to a Kafka topic partition. It means, that even if we have multiple orders at the same time related to e.g. same product, they are all processed sequentially since they have the same message key (productId in that case).

Moreover, by default, there is only a single stream thread that handles all the partitions. You can see this in the logs below. However, there are stream tasks that act as the lowest-level units of parallelism. As a result, stream tasks can be processed independently and in parallel without manual intervention.

I hope this article helps you to better understand Kafka Streams. I just wanted to give you a simple example of how you can use Kafka Streams with Saga transactions in order to simplify your current architecture.

The post Deep Dive into Saga Transactions with Kafka Streams and Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/02/07/deep-dive-into-saga-transactions-with-kafka-streams-and-spring-boot/feed/ 8 10587
Distributed Transactions in Microservices with Kafka Streams and Spring Boot https://piotrminkowski.com/2022/01/24/distributed-transactions-in-microservices-with-kafka-streams-and-spring-boot/ https://piotrminkowski.com/2022/01/24/distributed-transactions-in-microservices-with-kafka-streams-and-spring-boot/#comments Mon, 24 Jan 2022 11:11:24 +0000 https://piotrminkowski.com/?p=10501 In this article, you will learn how to use Kafka Streams with Spring Boot. We will rely on the Spring Kafka project. In order to explain well how it works, we are going to implement a saga pattern. The saga pattern is a way to manage distributed transactions across microservices. The key phase of that […]

The post Distributed Transactions in Microservices with Kafka Streams and Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams with Spring Boot. We will rely on the Spring Kafka project. In order to explain well how it works, we are going to implement a saga pattern. The saga pattern is a way to manage distributed transactions across microservices. The key phase of that process is to publish an event that triggers local transactions. Microservices exchanges such events through a message broker. It turns out that Kafka Streams may help us here. Let’s see how!

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions.

Instead of Spring Kafka, you could as well use Spring Cloud Stream for Kafka. You can read more about it in this article. Spring Cloud Stream provides several useful features like DLQ support, serialization to JSON by default, or interactive queries.

Architecture

We will create a simple system that consists of three microservices. The order-service sends orders to the Kafka topic called orders. Both other microservices stock-service and payment-service listen for the incoming events. After receiving them they verify if it is possible to execute the order. For example, if there are no sufficient funds on the customer account the order is rejected. Otherwise, the payment-service accepts the order and sends a response to the payment-orders topic. The same with stock-service except that it verifies a number of products in stock and sends a response to the stock-orders topic.

Then, the order-service joins two streams from the stock-orders and payment-orders topics by the order’s id. If both orders were accepted it confirms a distributed transaction. On the other hand, if one order has been accepted and the second rejected it performs rollback. In that case, it just generates o new order event and sends it to the orders topic. We may treat the orders topic as a stream of the order’s status changes or just like a table with the last status. Here’s the picture that visualizes our scenario.

kafka-streams-spring-boot-arch

Kafka Streams with Spring Boot

Let’s begin our implementation from the order-service. Surprisingly there is no Spring Boot starter for Kafka (unless we use Spring Cloud Stream). Therefore we need to include the spring-kafka dependency. In order to process streams we also need to include the kafka-streams module directly. Since the order-service exposes some REST endpoints it is required to add the Spring Boot Web starter.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
</dependency>

The order-service is the most important microservice in our scenario. It acts as an order gateway and a saga pattern orchestrator. It requires all the three topics used in our architecture. In order to automatically create topics on application startup we need to define the following beans:

@Bean
public NewTopic orders() {
   return TopicBuilder.name("orders")
         .partitions(3)
         .compact()
         .build();
}

@Bean
public NewTopic paymentTopic() {
   return TopicBuilder.name("payment-orders")
         .partitions(3)
         .compact()
         .build();
}

@Bean
public NewTopic stockTopic() {
   return TopicBuilder.name("stock-orders")
         .partitions(3)
         .compact()
         .build();
}

Then, let’s define our first Kafka stream. To do that we need to use the StreamsBuilder bean. The order-service receives events from the payment-service (in the payment-events topic) and from the stock-service (in the stock-events topic). Every single event contains the id previously set by the order-service. If we join both these streams into a single stream by order’s id we will be able to determine the status of our transaction. The algorithm is pretty simple. If both payment-service and stock-service accepted the order the final status of transaction is CONFIRMED. If both services rejected the order the final status is REJECTED. The last option is ROLLBACK – when one service accepted the order, and one service rejected it. Here’s the described method inside the OrderManageService bean.

@Service
public class OrderManageService {

   public Order confirm(Order orderPayment, Order orderStock) {
      Order o = new Order(orderPayment.getId(),
             orderPayment.getCustomerId(),
             orderPayment.getProductId(),
             orderPayment.getProductCount(),
             orderPayment.getPrice());
      if (orderPayment.getStatus().equals("ACCEPT") &&
                orderStock.getStatus().equals("ACCEPT")) {
         o.setStatus("CONFIRMED");
      } else if (orderPayment.getStatus().equals("REJECT") &&
                orderStock.getStatus().equals("REJECT")) {
         o.setStatus("REJECTED");
      } else if (orderPayment.getStatus().equals("REJECT") ||
                orderStock.getStatus().equals("REJECT")) {
         String source = orderPayment.getStatus().equals("REJECT")
                    ? "PAYMENT" : "STOCK";
         o.setStatus("ROLLBACK");
         o.setSource(source);
      }
      return o;
   }

}

Finally, the implementation of our stream. We need to define the KStream bean. We are joining two streams using the join method of KStream. The joining window is 10 seconds. As the result, we are setting the status of the order and sending a new order to the orders topic. We use the same topic as for sending new orders.

@Autowired
OrderManageService orderManageService;

@Bean
public KStream<Long, Order> stream(StreamsBuilder builder) {
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   KStream<Long, Order> stream = builder
         .stream("payment-orders", Consumed.with(Serdes.Long(), orderSerde));

   stream.join(
         builder.stream("stock-orders"),
         orderManageService::confirm,
         JoinWindows.of(Duration.ofSeconds(10)),
         StreamJoined.with(Serdes.Long(), orderSerde, orderSerde))
      .peek((k, o) -> LOG.info("Output: {}", o))
      .to("orders");

   return stream;
}

Let’s see it also in the picture.

kafka-streams-spring-boot-join

Configuration for Spring Boot

In Spring Boot the name of the application is by default the name of the consumer group for Kafka Streams. Therefore, we should set in application.yml. Of course, we also need to set the address of the Kafka bootstrap server. Finally, we have to configure the default key and value for events serialization. It applies to both standard and streams processing.

spring.application.name: orders
spring.kafka:
  bootstrap-servers: 127.0.0.1:56820
  producer:
    key-serializer: org.apache.kafka.common.serialization.LongSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  streams:
    properties:
      default.key.serde: org.apache.kafka.common.serialization.Serdes$LongSerde
      default.value.serde: org.springframework.kafka.support.serializer.JsonSerde
      spring.json.trusted.packages: "*"

Sending and receiving events from Kafka topics

In the previous section, we discussed how to create a new Kafka stream as a result of joining two other streams. Now, let’s see how to process incoming messages. We can consider it on the example of the payment-service. It listens for the incoming orders. If it receives a new order it performs reservation on the customer’s account and sends a response with a reservation status to the payment-orders topic. If it receives confirmation of the transaction from the order-service, it commits the transaction or rollbacks it. In order to enable Kafka listener, we should annotate the main class with @EnableKafka. Additionally, the listening method must be annotated with the @KafkaListener. The following method listens for events on the orders topic and runs in the payment consumer group.

@SpringBootApplication
@EnableKafka
public class PaymentApp {

    private static final Logger LOG = LoggerFactory.getLogger(PaymentApp.class);

    public static void main(String[] args) {
        SpringApplication.run(PaymentApp.class, args);
    }

    @Autowired
    OrderManageService orderManageService;

    @KafkaListener(id = "orders", topics = "orders", groupId = "payment")
    public void onEvent(Order o) {
        LOG.info("Received: {}" , o);
        if (o.getStatus().equals("NEW"))
            orderManageService.reserve(o);
        else
            orderManageService.confirm(o);
    }
}

Here’s the implementation of the OrderManageService used in the previous code snippet. If it receives the order in the NEW status it performs reservation. During the reservation, it subtracts the order’s price from the amountAvailable field and adds the same value to the amountReserved field. Then it sets the status of the order and sends a response to the payment-orders topic using KafkaTemplate. During the confirmation phase, it doesn’t send any response event. It can perform a rollback, which means – subtracting the order’s price from the amountReserved field and adding it to the amountAvailable field. Otherwise it just “commits” the transaction by subtracting the price from the amountReserved field.

@Service
public class OrderManageService {

   private static final String SOURCE = "payment";
   private static final Logger LOG = LoggerFactory.getLogger(OrderManageService.class);
   private CustomerRepository repository;
   private KafkaTemplate<Long, Order> template;

   public OrderManageService(CustomerRepository repository, KafkaTemplate<Long, Order> template) {
      this.repository = repository;
      this.template = template;
   }

   public void reserve(Order order) {
      Customer customer = repository.findById(order.getCustomerId()).orElseThrow();
      LOG.info("Found: {}", customer);
      if (order.getPrice() < customer.getAmountAvailable()) {
         order.setStatus("ACCEPT");
         customer.setAmountReserved(customer.getAmountReserved() + order.getPrice());
         customer.setAmountAvailable(customer.getAmountAvailable() - order.getPrice());
      } else {
         order.setStatus("REJECT");
      }
      order.setSource(SOURCE);
      repository.save(customer);
      template.send("payment-orders", order.getId(), order);
      LOG.info("Sent: {}", order);
   }

   public void confirm(Order order) {
      Customer customer = repository.findById(order.getCustomerId()).orElseThrow();
      LOG.info("Found: {}", customer);
      if (order.getStatus().equals("CONFIRMED")) {
         customer.setAmountReserved(customer.getAmountReserved() - order.getPrice());
         repository.save(customer);
      } else if (order.getStatus().equals("ROLLBACK") && !order.getSource().equals(SOURCE)) {
         customer.setAmountReserved(customer.getAmountReserved() - order.getPrice());
         customer.setAmountAvailable(customer.getAmountAvailable() + order.getPrice());
         repository.save(customer);
      }

   }
}

A similar logic is implemented on the stock-service side. However, instead of the order’s price, it uses the field productCount and performs reservation for the desired number of ordered products. Here’s the implementation of the OrderManageService class in the stock-service.

@Service
public class OrderManageService {

   private static final String SOURCE = "stock";
   private static final Logger LOG = LoggerFactory.getLogger(OrderManageService.class);
   private ProductRepository repository;
   private KafkaTemplate<Long, Order> template;

   public OrderManageService(ProductRepository repository, KafkaTemplate<Long, Order> template) {
      this.repository = repository;
      this.template = template;
   }

   public void reserve(Order order) {
      Product product = repository.findById(order.getProductId()).orElseThrow();
      LOG.info("Found: {}", product);
      if (order.getStatus().equals("NEW")) {
         if (order.getProductCount() < product.getAvailableItems()) {
            product.setReservedItems(product.getReservedItems() + order.getProductCount());
            product.setAvailableItems(product.getAvailableItems() - order.getProductCount());
            order.setStatus("ACCEPT");
            repository.save(product);
         } else {
            order.setStatus("REJECT");
         }
         template.send("stock-orders", order.getId(), order);
         LOG.info("Sent: {}", order);
      }
   }

   public void confirm(Order order) {
      Product product = repository.findById(order.getProductId()).orElseThrow();
      LOG.info("Found: {}", product);
      if (order.getStatus().equals("CONFIRMED")) {
         product.setReservedItems(product.getReservedItems() - order.getProductCount());
         repository.save(product);
      } else if (order.getStatus().equals("ROLLBACK") && !order.getSource().equals(SOURCE)) {
         product.setReservedItems(product.getReservedItems() - order.getProductCount());
         product.setAvailableItems(product.getAvailableItems() + order.getProductCount());
         repository.save(product);
      }
   }

}

Query Kafka Stream with Spring Boot

Now, let’s consider the following scenario. Firstly, the order-service receives a new order (via REST API) and sends it to the Kafka topic. This order is then received by both other microservices. Once they send back a positive response (or negative) the order-service process them as streams and change the status of the order. The order with a new status is emitted to the same topic as before. So, where we are storing the data with the current status of an order? In Kafka topic. Once again we will use Kafka Streams in our Spring Boot application. But this time, we take advantage of KTable. Let’s at the visualization of our scenario.

kafka-streams-spring-boot-ktable

Ok, so let’s define another Kafka Streams bean in the order-service. We are getting the same orders topic as a stream. We will convert it to the Kafka table and materialize it in a persistent store. Thanks to that we will be able to easily query the store from our REST controller.

@Bean
public KTable<Long, Order> table(StreamsBuilder builder) {
   KeyValueBytesStoreSupplier store =
         Stores.persistentKeyValueStore("orders");
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   KStream<Long, Order> stream = builder
         .stream("orders", Consumed.with(Serdes.Long(), orderSerde));
   return stream.toTable(Materialized.<Long, Order>as(store)
         .withKeySerde(Serdes.Long())
         .withValueSerde(orderSerde));
}

If we run more than one instance of the order-service on the same machine it is also important to override the default location of the state store. To do that we should define the following property unique per every instance:

spring.kafka.streams.state-dir: /tmp/kafka-streams/1

Unfortunately, there is no built-in support in Spring Boot for interactive queries of Kafka Streams. However, we can use auto-configured StreamsBuilderFactoryBean to inject KafkaStreams instance into the controller. Then we can query the state store under the “materialized” name. That’s of course very trivial sample. We are just getting all orders from KTable.

@GetMapping
public List<Order> all() {
   List<Order> orders = new ArrayList<>();
      ReadOnlyKeyValueStore<Long, Order> store = kafkaStreamsFactory
         .getKafkaStreams()
         .store(StoreQueryParameters.fromNameAndType(
                "orders",
                QueryableStoreTypes.keyValueStore()));
   KeyValueIterator<Long, Order> it = store.all();
   it.forEachRemaining(kv -> orders.add(kv.value));
   return orders;
}

In the same OrderController there is also a method for sending a new order to the Kafka topic.

@PostMapping
public Order create(@RequestBody Order order) {
   order.setId(id.incrementAndGet());
   template.send("orders", order.getId(), order);
   LOG.info("Sent: {}", order);
   return order;
}

Testing Scenario

Before we run our sample microservices, we need to start the local instance of Kafka. Usually, I’m using Redpanda for that. It is a Kafka API compatible streaming platform. In comparison to Kafka, it is relatively easy to run it locally. All you need to do is to install the rpk CLI locally (here is the instruction for macOS). After that, you can create a single-node instance using the following command:

$ rpk container start

After running, it will print the address of your node. For me, it is 127.0.0.1:56820. You should put that address as a value of the spring.kafka.bootstrap-servers property. You can also display a list of created topics using the following command:

$ rpk topic list --brokers 127.0.0.1:56820

Then, let’s run our microservices. Begin from the order-service since it is creating all the required topics and building the Kafka Streams instance. You can send a single order using the REST endpoint:

$ curl -X 'POST' \
  'http://localhost:8080/orders' \
  -H 'Content-Type: application/json' \
  -d '{
  "customerId": 10,
  "productId": 10,
  "productCount": 5,
  "price": 100,
  "status": "NEW"
}'

There is some test data inserted while payment-service and stock-service start. So, you can set the value of customerId or productId between 1 and 100 and it will work for you. However, you can use as well a method for generating a random stream of data. The following bean is responsible for generating 10000 random orders:

@Service
public class OrderGeneratorService {

   private static Random RAND = new Random();
   private AtomicLong id = new AtomicLong();
   private Executor executor;
   private KafkaTemplate<Long, Order> template;

   public OrderGeneratorService(Executor executor, KafkaTemplate<Long, Order> template) {
      this.executor = executor;
      this.template = template;
   }

   @Async
   public void generate() {
      for (int i = 0; i < 10000; i++) {
         int x = RAND.nextInt(5) + 1;
         Order o = new Order(id.incrementAndGet(), RAND.nextLong(100) + 1, RAND.nextLong(100) + 1, "NEW");
         o.setPrice(100 * x);
         o.setProductCount(x);
         template.send("orders", o.getId(), o);
      }
   }
}

You can start that process by calling the endpoint POST /orders/generate.

@PostMapping("/generate")
public boolean create() {
   orderGeneratorService.generate();
   return true;
}

No matter if decide to send single order or generate multiple random orders you can easily query the status of orders using the following endpoint:

$ curl http://localhost:8080/orders

Here’s a structure of topics generated by the application and by Kafka Streams to perform join operation and save the orders KTable as a state store.

The post Distributed Transactions in Microservices with Kafka Streams and Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/01/24/distributed-transactions-in-microservices-with-kafka-streams-and-spring-boot/feed/ 26 10501
Kafka Streams with Quarkus https://piotrminkowski.com/2021/11/24/kafka-streams-with-quarkus/ https://piotrminkowski.com/2021/11/24/kafka-streams-with-quarkus/#comments Wed, 24 Nov 2021 08:24:53 +0000 https://piotrminkowski.com/?p=10234 In this article, you will learn how to use Kafka Streams with Quarkus. The same as in my previous article we will create a simple application that simulates the stock market. But this time, we are going to use Quarkus instead of Spring Cloud. If you would like to figure out what is a streaming […]

The post Kafka Streams with Quarkus appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams with Quarkus. The same as in my previous article we will create a simple application that simulates the stock market. But this time, we are going to use Quarkus instead of Spring Cloud. If you would like to figure out what is a streaming platform and how it differs from a traditional message broker this article is for you. Moreover, we will study useful improvements related to Apache Kafka provided by Quarkus.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Architecture

In our case, there are two incoming streams of events. Both of them represent incoming orders. These orders are generated by the order-service application. It sends buy orders to the orders.buy topic and sell orders to the orders.sell topic. Then, the stock-service application receives and handles incoming events. In the first step, it needs to change the key of each message from the orderId to the productId. That’s because it has to join orders from different topics related to the same product in order to execute transactions. Finally, the transaction price is an average of sale and buy prices.

quarkus-kafka-streams-arch

We are building a simplified version of the stock market platform. Each buy order contains a maximum price at which a customer is expecting to buy a product. On the other hand, each sale order contains a minimum price a customer is ready to sell his product. If the sell order price is not greater than a buy order price for a particular product we are performing a transaction.

Each order is valid for 10 seconds. After that time the stock-service application will not handle such an order since it is considered as expired. Each order contains a number of products for a transaction. For example, we may sell 100 for 10 or buy 200 for 11. Therefore, an order may be fully or partially realized. The stock-service application tries to join partially realized orders to other new or partially realized orders. You can see the visualization of that process in the picture below.

quarkus-kafka-streams-app

Run Apache Kafka locally

Before we jump to the implementation, we need to run a local instance of Apache Kafka. If you don’t want to install it on your laptop, the best way to run it is with Redpanda. Redpanda is a Kafka API compatible streaming platform. In comparison to Kafka, it is relatively easy to run it locally. Normally, you would have to install Redpanda on your laptop and then create a cluster using their CLI. But with Quarkus you don’t need to do that! The only requirement is to have Docker installed. Thanks to the Quarkus Kafka extension and feature called Dev Services it automatically starts a Kafka broker in dev mode and when running tests. Moreover, the application is configured automatically.

The only thing you need to do in order to enable that feature is NOT to provide any Kafka address in configuration properties. Dev Services uses Testcontainers to run Kafka, so if you have Docker or any other environment supporting Testcontainers running you get a containerized instance of Kafka out-of-the-box. Another important thing. Firstly, start the order-service application. It automatically creates all the required topics in Kafka. Then run the stock-service application. It uses the Quarkus Kafka Streams extension and verifies if the required topics exist. Let’s visualize it.

quarkus-kafka-streams-run

Send events to Kafka with Quarkus

There are several ways to send events to Kafka with Quarkus. Because we need to send key/value pair we will use the io.smallrye.reactive.messaging.kafka.Record object for that. Quarkus is able to generate and send data continuously. In the fragment of code visible below, we send a single Order event per 500 ms. Each Order contains a random productId, price and productCount.

@Outgoing("orders-buy")
public Multi<Record<Long, Order>> buyOrdersGenerator() {
   return Multi.createFrom().ticks().every(Duration.ofMillis(500))
      .map(order -> {
         Integer productId = random.nextInt(10) + 1;
         int price = prices.get(productId) + random.nextInt(200);
         Order o = new Order(
             incrementOrderId(),
             random.nextInt(1000) + 1,
             productId,
             100 * (random.nextInt(5) + 1),
             LocalDateTime.now(),
             OrderType.BUY,
             price);
         log.infof("Sent: %s", o);
         return Record.of(o.getId(), o);
   });
}

@Outgoing("orders-sell")
public Multi<Record<Long, Order>> sellOrdersGenerator() {
   return Multi.createFrom().ticks().every(Duration.ofMillis(500))
      .map(order -> {
         Integer productId = random.nextInt(10) + 1;
         int price = prices.get(productId) + random.nextInt(200);
         Order o = new Order(
             incrementOrderId(),
             random.nextInt(1000) + 1,
             productId,
             100 * (random.nextInt(5) + 1),
             LocalDateTime.now(),
             OrderType.SELL,
             price);
         log.infof("Sent: %s", o);
         return Record.of(o.getId(), o);
   });
}

We will also define a single @Incoming channel in order to receive transactions produced by the stock-service. Thanks to that Quarkus will automatically create the topic transactions used by Quarkus Kafka Streams in stock-service. To be honest, I was not able to force the Quarkus Kafka Streams extension to create the topic automatically. It seems we need to use the SmallRye Reactive Messaging extension for that.

@Incoming("transactions")
public void transactions(Transaction transaction) {
   log.infof("New: %s", transaction);
}

Of course, we need to include the SmallRye Reactive Messaging dependency to the Maven pom.xml.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

Finally, let’s provide configuration settings. We have two outgoing topics and a single incoming topic. We can set their names. Otherwise, Quarkus uses the same name as the name of the channel. The names of our topics are orders.buy, order.sell and transactions.

mp.messaging.outgoing.orders-buy.connector = smallrye-kafka
mp.messaging.outgoing.orders-buy.topic = orders.buy
mp.messaging.outgoing.orders-buy.key.serializer = org.apache.kafka.common.serialization.LongSerializer
mp.messaging.outgoing.orders-buy.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

mp.messaging.outgoing.orders-sell.connector = smallrye-kafka
mp.messaging.outgoing.orders-sell.topic = orders.sell
mp.messaging.outgoing.orders-sell.key.serializer = org.apache.kafka.common.serialization.LongSerializer
mp.messaging.outgoing.orders-sell.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

mp.messaging.incoming.transactions.connector = smallrye-kafka
mp.messaging.incoming.transactions.topic = transactions
mp.messaging.incoming.transactions.value.deserializer = pl.piomin.samples.streams.order.model.deserializer.TransactionDeserializer

That’s all. Our orders generator is ready. If you the order-service application Quarkus will also run Kafka (Redpanda) instance. But first, let’s switch to the second sample application – stock-service.

Consume Kafka Streams with Quarkus

In the previous section, we were sending messages to the Kafka broker. Therefore, we used a standard Quarkus library for integration with Kafka based on the SmallRye Reactive Messaging framework. The stock-service application consumes messages as streams, so now we will use a module for Kafka Streams integration.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-kafka-streams</artifactId>
</dependency>

Our application also uses a database, an ORM layer and includes some other useful modules.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-jdbc-h2</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-health</artifactId>
</dependency>

In the first step, we are going to merge both streams of orders (buy and sell), insert the Order into the database, and print the event message. You could ask – why I use the database and ORM layer here since I have Kafka KTable? Well, I need transactions with lock support in order to coordinate the status of order realization (refer to the description in the introduction – fully and partially realized orders). I will give you more details about it in the next sections.

In order to process streams with Quarkus, we need to declare the org.apache.kafka.streams.Topology bean. It contains all the KStream and KTable definitions. Let’s start just with the part responsible for creating and emitting transactions from incoming orders. There are two KStream definitions created. The first of them is responsible for merging two order streams into a single one and then inserting a new Order into a database. The second of them creates and executes transactions by joining two streams using the productId key. But more about it in the next section.

@Produces
public Topology buildTopology() {
   ObjectMapperSerde<Order> orderSerde = 
      new ObjectMapperSerde<>(Order.class);
   ObjectMapperSerde<Transaction> transactionSerde = 
      new ObjectMapperSerde<>(Transaction.class);

   StreamsBuilder builder = new StreamsBuilder();

   KStream<Long, Order> orders = builder.stream(
      ORDERS_SELL_TOPIC,
      Consumed.with(Serdes.Long(), orderSerde));

   builder.stream(ORDERS_BUY_TOPIC, 
         Consumed.with(Serdes.Long(), orderSerde))
      .merge(orders)
      .peek((k, v) -> {
         log.infof("New: %s", v);
         logic.add(v);
      });

   builder.stream(ORDERS_BUY_TOPIC, 
         Consumed.with(Serdes.Long(), orderSerde))
      .selectKey((k, v) -> v.getProductId())
      .join(orders.selectKey((k, v) -> v.getProductId()),
         this::execute,
         JoinWindows.of(Duration.ofSeconds(10)),
         StreamJoined.with(Serdes.Integer(), orderSerde, orderSerde))
      .filterNot((k, v) -> v == null)
      .map((k, v) -> new KeyValue<>(v.getId(), v))
      .peek((k, v) -> log.infof("Done -> %s", v))
      .to(TRANSACTIONS_TOPIC, Produced.with(Serdes.Long(), transactionSerde));

}

To process the streams we need to add configuration properties. A list of input topics is required. We can also override a default application id and enable Kafka health check.

quarkus.kafka-streams.application-id = stock
quarkus.kafka-streams.topics = orders.buy,orders.sell
quarkus.kafka.health.enabled = true

Operations on Kafka Streams

Now, we may use some more advanced operations on Kafka Streams than just merging two different streams. In fact, that’s a key logic in our application. We need to join two different order streams into a single one using the productId as a joining key. Since the producer sets orderId as a message key, we first need to invoke the selectKey method for both order.sell and orders.buy streams. In our case, joining buy and sell orders related to the same product is just a first step. Then we need to verify if the maximum price in the buy order is not greater than the minimum price in the sell order.

The next step is to verify if both these have not been realized previously, as they also may be paired with other orders in the stream. If all the conditions are met we may create a new transaction. Finally, we may change a stream key from productId to the transactionId and send it to the dedicated transactions topic.

Each time we successfully join two orders we are trying to create a transaction. The execute(...) method is called within the KStream join method. Firstly, we are comparing the prices of both orders. Then we verify the realization status of both orders by accessing the H2 database. If the orders are still not fully realized we may create a transaction and update orders records in the database.

private Transaction execute(Order orderBuy, Order orderSell) {
   if (orderBuy.getAmount() >= orderSell.getAmount()) {
      int count = Math.min(orderBuy.getProductCount(), 
                           orderSell.getProductCount());
      boolean allowed = logic
         .performUpdate(orderBuy.getId(), orderSell.getId(), count);
      if (!allowed)
         return null;
      else
         return new Transaction(
            ++transactionId,
            orderBuy.getId(),
            orderSell.getId(),
            count,
            (orderBuy.getAmount() + orderSell.getAmount()) / 2,
            LocalDateTime.now(),
            "NEW"
      );
   } else {
            return null;
   }
}

Let’s take a closer look at the performUpdate() method called inside the execute() method. It initiates a transaction and locks both Order entities. Then it verifies each order realization status and updates it with the current values if possible. Only if the performUpdate() method finishes successfully the stock-service application creates a new transaction.

@ApplicationScoped
public class OrderLogic {

    @Inject
    Logger log;
    @Inject
    OrderRepository repository;

    @Transactional
    public Order add(Order order) {
        repository.persist(order);
        return order;
    }

    @Transactional
    public boolean performUpdate(Long buyOrderId, Long sellOrderId, int amount) {
        Order buyOrder = repository.findById(buyOrderId, 
           LockModeType.PESSIMISTIC_WRITE);
        Order sellOrder = repository.findById(sellOrderId, 
           LockModeType.PESSIMISTIC_WRITE);
        if (buyOrder == null || sellOrder == null)
            return false;
        int buyAvailableCount = 
           buyOrder.getProductCount() - buyOrder.getRealizedCount();
        int sellAvailableCount = 
           sellOrder.getProductCount() - sellOrder.getRealizedCount();
        if (buyAvailableCount >= amount && sellAvailableCount >= amount) {
            buyOrder.setRealizedCount(buyOrder.getRealizedCount() + amount);
            sellOrder.setRealizedCount(sellOrder.getRealizedCount() + amount);
            repository.persist(buyOrder);
            repository.persist(sellOrder);
            return true;
        } else {
            return false;
        }
    }
}

Nice 🙂 That’s all that we need to do in the first part of our exercise. Now we can run both our sample applications.

Run and manage Kafka Streams application with Quarkus

As I mentioned before, we first need to start the order-service. It runs a new Kafka instance and creates all required topics. Immediately after startup, it is ready to send new orders. To run the Quarkus app locally just go to the order-service directory and execute the following command:

$ mvn quarkus:dev

Just to verify you can display a list running Docker containers with the docker ps command. Here’s my result:

As you see the instance of Redpanda is running and it is available on a random port 49724. Quarkus did it for us. However, if you have Redpanda installed on your laptop you check out the list of created topics with their CLI rpk:

$ rpk topic list --brokers=127.0.0.1:49724

Then let’s run the stock-service. Go to the stock-service directory and run mvn quarkus:dev once again. After startup, it just works. Both applications share the same instance thanks to the Quarkus Dev Services. Now let’s access the Quarkus Dev UI console available at http://localhost:8080/q/dev/. Find the tile with the “Apache Kafka Streams” title.

You can check a visualization of our Kafka Streams topology. I will divide the image into two parts for better visibility.

Use Kafka KTable with Quarkus

We have already finished the implementation of the logic responsible for creating transactions from incoming orders. In the next step, we are going to perform analytical operations on the transactions stream. Our main goal is to calculate total number of transactions, total number of products sold/bought, and total value of transactions (price * productsCount) per each product. Here’s the object class used in calculations.

@RegisterForReflection
public class TransactionTotal {
   private int count;
   private int amount;
   private int productCount;

   // GETTERS AND SETTERS
}

Because the Transaction object does not contain information about the product, we first need to join the order to access it. Then we produce a KTable by per productId grouping and aggregation. After that, we may invoke an aggregate method that allows us to perform some more complex calculations. In that particular case, we are calculating the number of all executed transactions, their volume of products, and total value. The result KTable can be materialized as the state store. Thanks to that we will be able to query it by the name defined by the TRANSACTIONS_PER_PRODUCT_SUMMARY variable.

KeyValueBytesStoreSupplier storePerProductSupplier = Stores.persistentKeyValueStore(
   TRANSACTIONS_PER_PRODUCT_SUMMARY);

builder.stream(TRANSACTIONS_TOPIC, Consumed.with(Serdes.Long(), transactionSerde))
   .selectKey((k, v) -> v.getSellOrderId())
   .join(orders.selectKey((k, v) -> v.getId()),
      (t, o) -> new TransactionWithProduct(t, o.getProductId()),
      JoinWindows.of(Duration.ofSeconds(10)),
      StreamJoined.with(Serdes.Long(), transactionSerde, orderSerde))
   .groupBy((k, v) -> v.getProductId(), Grouped.with(Serdes.Integer(), transactionWithProductSerde))
   .aggregate(
      TransactionTotal::new,
      (k, v, a) -> {
         a.setCount(a.getCount() + 1);
         a.setProductCount(a.getAmount() + v.getTransaction().getAmount());
         a.setAmount(a.getProductCount() +
            (v.getTransaction().getAmount() * v.getTransaction().getPrice()));
         return a;
      },
      Materialized.<Integer, TransactionTotal> as(storePerProductSupplier)
         .withKeySerde(Serdes.Integer())
         .withValueSerde(transactionTotalSerde))
   .toStream()
   .peek((k, v) -> log.infof("Total per product(%d): %s", k, v))
   .to(TRANSACTIONS_PER_PRODUCT_AGGREGATED_TOPIC, 
      Produced.with(Serdes.Integer(), transactionTotalSerde));

Here’s the class responsible for interactive queries implementation. It injects KafkaStreams bean. Then it tries to obtain persistent store basing on the StockService.TRANSACTIONS_PER_PRODUCT_SUMMARY variable. As a result there is a ReadOnlyKeyValueStore with Integer as a key, and TransactionTotal as a value. We may return a single value related with the particular productId (getTransactionsPerProductData) or just return a list with results for all available products (getAllTransactionsPerProductData).

@ApplicationScoped
public class InteractiveQueries {

   @Inject
   KafkaStreams streams;

   public TransactionTotal getTransactionsPerProductData(Integer productId) {
      return getTransactionsPerProductStore().get(productId);
   }

   public Map<Integer, TransactionTotal> getAllTransactionsPerProductData() {
      Map<Integer, TransactionTotal> m = new HashMap<>();
      KeyValueIterator<Integer, TransactionTotal> it = getTransactionsPerProductStore().all();
      while (it.hasNext()) {
         KeyValue<Integer, TransactionTotal> kv = it.next();
         m.put(kv.key, kv.value);
      }
      return m;
   }

   private ReadOnlyKeyValueStore<Integer, TransactionTotal> getTransactionsPerProductStore() {
      return streams.store(
         StoreQueryParameters
            .fromNameAndType(StockService.TRANSACTIONS_PER_PRODUCT_SUMMARY, QueryableStoreTypes.keyValueStore()));
   }

}

Finally, we can create a REST controller responsible for exposing data retrieved by the interactive queries.

@ApplicationScoped
@Path("/transactions")
public class TransactionResource {

    @Inject
    InteractiveQueries interactiveQueries;

    @GET
    @Path("/products/{id}")
    public TransactionTotal getByProductId(@PathParam("id") Integer productId) {
        return interactiveQueries.getTransactionsPerProductData(productId);
    }

    @GET
    @Path("/products")
    public Map<Integer, TransactionTotal> getAllPerProductId() {
        return interactiveQueries.getAllTransactionsPerProductData();
    }

}

Now you can easily check out statistics related to the transactions created by the stock-service. You just need to call the following REST endpoints e.g.:

$ curl http://localhost:8080/transactions/products
$ curl http://localhost:8080/transactions/products/3
$ curl http://localhost:8080/transactions/products/5

Final Thoughts

Quarkus simplifies working with Kafka Streams and interactive queries. It provides useful improvements for developers like auto-start of Kafka in dev and test modes or Kafka streams visualization in dev UI console. You can easily compare the Quarkus approach with the Spring Cloud Stream Kafka support since I implemented the same logic for both those frameworks. Here’s the GitHub repository with Spring Cloud Stream Kafka Streams example.

The post Kafka Streams with Quarkus appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/11/24/kafka-streams-with-quarkus/feed/ 4 10234
Kafka Streams with Spring Cloud Stream https://piotrminkowski.com/2021/11/11/kafka-streams-with-spring-cloud-stream/ https://piotrminkowski.com/2021/11/11/kafka-streams-with-spring-cloud-stream/#comments Thu, 11 Nov 2021 10:07:45 +0000 https://piotrminkowski.com/?p=10193 In this article, you will learn how to use Kafka Streams with Spring Cloud Stream. We will build a simple Spring Boot application that simulates the stock market. Based on that example, I’ll try to explain what a streaming platform is and how it differs from a traditional message broker. If you are looking for […]

The post Kafka Streams with Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams with Spring Cloud Stream. We will build a simple Spring Boot application that simulates the stock market. Based on that example, I’ll try to explain what a streaming platform is and how it differs from a traditional message broker. If you are looking for an intro to the Spring Cloud Stream project you should read my article about it. It describes how to use Spring Cloud Stream with RabbitMQ in order to build event-driven microservices.

In Spring Cloud Stream there are two binders supporting the Kafka platform. We will focus on the second of them – Apache Kafka Streams Binder. You can read more about it in Spring Cloud documentation available here.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Introduction

There are three major types in Kafka Streams – KStreamKTable and GlobalKTable. Spring Cloud Stream supports all of them. We can easily convert the stream to the table and vice-versa. To clarify, all Kafka topics are stored as a stream. The difference is: when we want to consume that topic, we can either consume it as a table or a stream. KTable takes a stream of records from a topic and reduces it down to unique entries using a key of each message.

Architecture

KStream represents an immutable stream of data where each new record is treated as INSERT. In our case, there are two incoming streams of events. Both of them represent incoming orders. These orders are generated by the order-service application. It sends buy orders to the orders.buy topic and sell orders to the orders.sell topic. The stock-service application receives and handles events from those topics. In the first step, it needs to change the key of each message from the orderId to the productId. That’s because it has to join orders from different topics related to the same product in order to execute transactions. The final transaction price is an average of sell and buy order price.

kafka-streams-spring-cloud-concept

We are building a very simplified version of the stock market platform. Each buy order contains a maximum price at which a customer is expecting to buy a product. On the other hand, each sell order contains a minimum price a customer is ready to sell his product. If the sell order price is not greater than a buy order price for a particular product we may perform a transaction.

Each order is valid for 10 seconds. After that time the stock-service application will not handle such an order since it is considered as expired. Each order an amount of product for a transaction. For example, we may sell 100 for 10 or buy 200 for 11. Therefore, an order may be fully or partially realized. The stock-service application tries to join partially realized orders to other new or partially realized orders. You can see the visualization of that process in the picture below.

kafka-streams-spring-cloud-arch

Run Apache Kafka locally

Before we jump to the implementation, we need to run a local instance of Apache Kafka. If you don’t want to install it on your laptop, the best way to run it is through Redpanda. Redpanda is a Kafka API compatible streaming platform. In comparison to Kafka, it is relatively easy to run it locally. You just need to have Docker installed. Once you installed Redpanda on your machine you need to create a cluster. Since you don’t need a large cluster during development, you can create a single-node instance using the following command:

$ rpk container start

After running, it will print the address of your node. For me, it is 127.0.0.1:50842. So, now I can display a list of created topics using the following command:

$ rpk topic list --brokers 127.0.0.1:50842

Currently, there are no topics created. We don’t need to do anything manually. Spring Cloud Stream automatically creates missing topics on the application startup. In case, you would like to remove the Redpanda instance after our exercise, you just need to run the following command:

$ rpk container purge

Perfectly! Our local instance of Kafka is running. After that, we may proceed to the development.

Send events to Kafka with Spring Cloud Stream

In order to generate and send events continuously with Spring Cloud Stream Kafka, we need to define a Supplier bean. In our case, the order-service application generates test data. Each message contains a key and a payload that is serialized to JSON. The message key is the order’s id. We have two Supplier beans since we are sending messages to the two topics. Here’s the Order event class:

@Getter
@Setter
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class Order {
   private Long id;
   private Integer customerId;
   private Integer productId;
   private int productCount;
   @JsonDeserialize(using = LocalDateTimeDeserializer.class)
   @JsonSerialize(using = LocalDateTimeSerializer.class)
   private LocalDateTime creationDate;
   private OrderType type;
   private int amount;
}

Our application uses Lombok and Jackson for messages serialization. Of course, we also need to include Spring Cloud Stream Kafka Binder. Opposite to the consumer side, the producer does not use Kafka Streams, because it is just generating and sending events.

<dependencies>
  <dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
  </dependency>
</dependencies>

We have a predefined list of orders just to test our solution. We use MessageBuilder to build a message that contains the header kafka_messageKey and the Order payload.

@SpringBootApplication
@Slf4j
public class OrderService {

   private static long orderId = 0;
   private static final Random r = new Random();

   LinkedList<Order> buyOrders = new LinkedList<>(List.of(
      new Order(++orderId, 1, 1, 100, LocalDateTime.now(), OrderType.BUY, 1000),
      new Order(++orderId, 2, 1, 200, LocalDateTime.now(), OrderType.BUY, 1050),
      new Order(++orderId, 3, 1, 100, LocalDateTime.now(), OrderType.BUY, 1030),
      new Order(++orderId, 4, 1, 200, LocalDateTime.now(), OrderType.BUY, 1050),
      new Order(++orderId, 5, 1, 200, LocalDateTime.now(), OrderType.BUY, 1000),
      new Order(++orderId, 11, 1, 100, LocalDateTime.now(), OrderType.BUY, 1050)
   ));

   LinkedList<Order> sellOrders = new LinkedList<>(List.of(
      new Order(++orderId, 6, 1, 200, LocalDateTime.now(), OrderType.SELL, 950),
      new Order(++orderId, 7, 1, 100, LocalDateTime.now(), OrderType.SELL, 1000),
      new Order(++orderId, 8, 1, 100, LocalDateTime.now(), OrderType.SELL, 1050),
      new Order(++orderId, 9, 1, 300, LocalDateTime.now(), OrderType.SELL, 1000),
      new Order(++orderId, 10, 1, 200, LocalDateTime.now(), OrderType.SELL, 1020)
   ));

   public static void main(String[] args) {
      SpringApplication.run(OrderService.class, args);
   }

   @Bean
   public Supplier<Message<Order>> orderBuySupplier() {
      return () -> {
         if (buyOrders.peek() != null) {
            Message<Order> o = MessageBuilder
                  .withPayload(buyOrders.peek())
                  .setHeader(KafkaHeaders.MESSAGE_KEY, Objects.requireNonNull(buyOrders.poll()).getId())
                  .build();
            log.info("Order: {}", o.getPayload());
            return o;
         } else {
            return null;
         }
      };
   }

   @Bean
   public Supplier<Message<Order>> orderSellSupplier() {
      return () -> {
         if (sellOrders.peek() != null) {
            Message<Order> o = MessageBuilder
                  .withPayload(sellOrders.peek())
                  .setHeader(KafkaHeaders.MESSAGE_KEY, Objects.requireNonNull(sellOrders.poll()).getId())
                  .build();
            log.info("Order: {}", o.getPayload());
            return o;
         } else {
            return null;
         }
      };
   }

}

After that, we need to provide some configuration settings inside the application.yml file. Since we use multiple binding beans (in our case Supplier beans) we have to define the property spring.cloud.stream.function.definition that contains a list of bindable functions. We need to pass the Supplier method names divided by a semicolon. In the next few lines, we are setting the name of the target topics on Kafka and the message key serializer. Of course, we also need to set the address of the Kafka broker.

spring.kafka.bootstrap-servers: ${KAFKA_URL}

spring.cloud.stream.function.definition: orderBuySupplier;orderSellSupplier

spring.cloud.stream.bindings.orderBuySupplier-out-0.destination: orders.buy
spring.cloud.stream.kafka.bindings.orderBuySupplier-out-0.producer.configuration.key.serializer: org.apache.kafka.common.serialization.LongSerializer

spring.cloud.stream.bindings.orderSellSupplier-out-0.destination: orders.sell
spring.cloud.stream.kafka.bindings.orderSellSupplier-out-0.producer.configuration.key.serializer: org.apache.kafka.common.serialization.LongSerializer

Before running the application I need to create an environment variable containing the address of the Kafka broker.

$ export KAFKA_URL=127.0.0.1:50842

Then, let’s run our Spring Cloud application using the following Maven command:

$ mvn clean spring-boot:run

Once you did that, it sent some test orders for the same product (productId=1) as shown below.

We can also verify a list of topics on our local Kafka instance. Both of them have been automatically created by the Spring Cloud Stream Kafka binder before sending messages.

Consume Kafka Streams with Spring Cloud Stream

Now, we are going to switch to the stock-service implementation. In order to process streams of events, we need to include the Spring Cloud Stream Kafka Streams binder. Also, our application would have an ORM layer for storing data, so we have to include the Spring Data JPA starter and the H2 database.

<dependencies>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-jpa</artifactId>
  </dependency>
  <dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
  </dependency>
</dependencies>

In the first step, we are going to merge both streams of orders (buy and sell), insert the Order into the database, and print the event message. You could ask – why I use the database and ORM layer here since I have Kafka KTable? Well, I need transactions with lock support in order to coordinate the status of order realization (refer to the description in the introduction – fully and partially realized orders). I will give you more details about it in the next sections.

In order to process streams, we need to declare a functional bean that takes KStream as an input parameter. If there are two sources, we have to use BiConsumer (just for consumption) or BiFunction (to consume and send events to the new target stream) beans. In that case, we are not creating a new stream of events, so we can use BiConsumer.

@Autowired
OrderLogic logic;

@Bean
public BiConsumer<KStream<Long, Order>, KStream<Long, Order>> orders() {
   return (orderBuy, orderSell) -> orderBuy
         .merge(orderSell)
         .peek((k, v) -> {
            log.info("New({}): {}", k, v);
            logic.add(v);
         });
}

After that, we need to add some configuration settings. There are two input topics, so we need to map their names. Also, if we have more than one functional bean we need to set applicationId related to the particular function. For now, it is not required, since we have only a single function. But later, we are going to add other functions for some advanced operations.

spring.cloud.stream.bindings.orders-in-0.destination: orders.buy
spring.cloud.stream.bindings.orders-in-1.destination: orders.sell
spring.cloud.stream.kafka.streams.binder.functions.orders.applicationId: orders

For now, that’s all. You can now run the instance of stock-service using the Maven command mvn spring-boot:run.

Operations on Kafka Streams

Now, we may use some more advanced operations on Kafka Streams than just merging two different streams. In fact, that’s a key logic in our application. We need to join two different order streams into a single one using the productId as a joining key. Since the producer sets orderId as a message key, we first need to invoke the selectKey method for both order.sell and orders.buy streams. In our case, joining buy and sell orders related to the same product is just a first step. Then we need to verify if the maximum price in the buy order is not greater than the minimum price in the sell order.

The next step is to verify if both these have not been realized previously, as they also may be paired with other orders in the stream. If all the conditions are met we may create a new transaction. Finally, we may change a stream key from productId to the transactionId and send it to the dedicated transactions topic.

In order to implement the scenario described above, we need to define the BiFunction bean. It takes two input KStream from orders.buy and orders.sell and creates a new KStream of transaction events sent to the output transactions topic. While joining streams it uses 10 seconds sliding window and invokes the execute method for creating a new transaction.

@Bean
public BiFunction<KStream<Long, Order>, KStream<Long, Order>, KStream<Long, Transaction>> transactions() {
   return (orderBuy, orderSell) -> orderBuy
         .selectKey((k, v) -> v.getProductId())
         .join(orderSell.selectKey((k, v) -> v.getProductId()),
               this::execute,
               JoinWindows.of(Duration.ofSeconds(10)),
               StreamJoined.with(Serdes.Integer(), 
                                 new JsonSerde<>(Order.class), 
                                 new JsonSerde<>(Order.class)))
         .filterNot((k, v) -> v == null)
         .map((k, v) -> new KeyValue<>(v.getId(), v))
         .peek((k, v) -> log.info("Done -> {}", v));
}

private Transaction execute(Order orderBuy, Order orderSell) {
   if (orderBuy.getAmount() >= orderSell.getAmount()) {
      int count = Math.min(orderBuy.getProductCount(), orderSell.getProductCount());
      boolean allowed = logic.performUpdate(orderBuy.getId(), orderSell.getId(), count);
      if (!allowed)
         return null;
      else
         return new Transaction(
            ++transactionId,
            orderBuy.getId(),
            orderSell.getId(),
            Math.min(orderBuy.getProductCount(), orderSell.getProductCount()),
            (orderBuy.getAmount() + orderSell.getAmount()) / 2,
            LocalDateTime.now(),
            "NEW");
   } else {
      return null;
   }
}

Let’s take a closer look at the performUpdate() method called inside the execute() method. It initiates a transaction and locks both Order entities. Then it verifies each order realization status and updates it with the current values if possible. Only if the performUpdate() method finishes successfully the stock-service application creates a new transaction.

@Service
public class OrderLogic {

   private OrderRepository repository;

   public OrderLogic(OrderRepository repository) {
      this.repository = repository;
   }

   public Order add(Order order) {
      return repository.save(order);
   }

   @Transactional
   public boolean performUpdate(Long buyOrderId, Long sellOrderId, int amount) {
      Order buyOrder = repository.findById(buyOrderId).orElseThrow();
      Order sellOrder = repository.findById(sellOrderId).orElseThrow();
      int buyAvailableCount = buyOrder.getProductCount() - buyOrder.getRealizedCount();
      int sellAvailableCount = sellOrder.getProductCount() - sellOrder.getRealizedCount();
      if (buyAvailableCount >= amount && sellAvailableCount >= amount) {
         buyOrder.setRealizedCount(buyOrder.getRealizedCount() + amount);
         sellOrder.setRealizedCount(sellOrder.getRealizedCount() + amount);
         repository.save(buyOrder);
         repository.save(sellOrder);
         return true;
      } else {
         return false;
      }
   }
}

Here’s our repository class with the findById method. It sets a pessimistic lock on the Order entity during the transaction.

public interface OrderRepository extends CrudRepository<Order, Long> {

  @Lock(LockModeType.PESSIMISTIC_WRITE)
  Optional<Order> findById(Long id);

}

We also need to provide configuration settings for the transaction BiFunction.

spring.cloud.stream.bindings.transactions-in-0.destination: orders.buy
spring.cloud.stream.bindings.transactions-in-1.destination: orders.sell
spring.cloud.stream.bindings.transactions-out-0.destination: transactions
spring.cloud.stream.kafka.streams.binder.functions.transactions.applicationId: transactions

spring.cloud.stream.function.definition: orders;transactions

Use Kafka KTable with Spring Cloud Stream

We have already finished the implementation of the logic responsible for creating transactions from incoming orders. Now, we would like to examine data generated by our stock-service application. The most important things are how many transactions were generated, what was the volume of transactions globally, and per product. Three key statistics related to our transactions are: the number of transactions, the number of products sell/buy during transactions, and the total amount of transactions (price * productsCount). Here’s the definition of our object used for counting aggregations.

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class TransactionTotal {
   private int count;
   private int productCount;
   private int amount;
}

In order to call an aggregation method, we first need to group orders stream by the selected key. In the method visible below we use the status field as a grouping key. After that, we may invoke an aggregate method that allows us to perform some more complex calculations. In that particular case, we are calculating the number of all executed transactions, their volume of products, and total amount. The result KTable can be materialized as the state store. Thanks to that we will be able to query it by the name all-transactions-store.

@Bean
public Consumer<KStream<Long, Transaction>> total() {
   KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
                "all-transactions-store");
   return transactions -> transactions
         .groupBy((k, v) -> v.getStatus(), 
                  Grouped.with(Serdes.String(), new JsonSerde<>(Transaction.class)))
         .aggregate(
                 TransactionTotal::new,
                 (k, v, a) -> {
                    a.setCount(a.getCount() + 1);
                    a.setProductCount(a.getProductCount() + v.getAmount());
                    a.setAmount(a.getAmount() + (v.getPrice() * v.getAmount()));
                    return a;
                 },
                 Materialized.<String, TransactionTotal> as(storeSupplier)
                    .withKeySerde(Serdes.String())
                    .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
         .toStream()
         .peek((k, v) -> log.info("Total: {}", v));
}

The next function performs a similar aggregate operation, but this time per each product. Because the Transaction object does not contain information about the product, we first need to join the order to access it. Then we produce a KTable by per productId grouping and aggregation. The same as before we are materializing aggregation as a state store.

@Bean
public BiConsumer<KStream<Long, Transaction>, KStream<Long, Order>> totalPerProduct() {
   KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
                "transactions-per-product-store");
   return (transactions, orders) -> transactions
         .selectKey((k, v) -> v.getSellOrderId())
         .join(orders.selectKey((k, v) -> v.getId()),
               (t, o) -> new TransactionTotalWithProduct(t, o.getProductId()),
               JoinWindows.of(Duration.ofSeconds(10)),
               StreamJoined.with(Serdes.Long(), 
                  new JsonSerde<>(Transaction.class), 
                  new JsonSerde<>(Order.class)))
         .groupBy((k, v) -> v.getProductId(), 
            Grouped.with(Serdes.Integer(), new JsonSerde<>(TransactionTotalWithProduct.class)))
         .aggregate(
               TransactionTotal::new,
               (k, v, a) -> {
                  a.setCount(a.getCount() + 1);
                  a.setProductCount(a.getProductCount() + v.getTransaction().getAmount());
                  a.setAmount(a.getAmount() + (v.getTransaction().getPrice() * v.getTransaction().getAmount()));
                  return a;
               },
               Materialized.<Integer, TransactionTotal> as(storeSupplier)
                  .withKeySerde(Serdes.Integer())
                  .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
         .toStream()
         .peek((k, v) -> log.info("Total per product({}): {}", k, v));
}

What if we would like to perform similar aggregations to described above, but only for a particular period of time? We need to invoke the windowedBy method and produce a dedicated state store for such operations.

@Bean
public BiConsumer<KStream<Long, Transaction>, KStream<Long, Order>> latestPerProduct() {
   WindowBytesStoreSupplier storeSupplier = Stores.persistentWindowStore(
      "latest-transactions-per-product-store", Duration.ofSeconds(30), Duration.ofSeconds(30), false);
   return (transactions, orders) -> transactions
      .selectKey((k, v) -> v.getSellOrderId())
      .join(orders.selectKey((k, v) -> v.getId()),
            (t, o) -> new TransactionTotalWithProduct(t, o.getProductId()),
            JoinWindows.of(Duration.ofSeconds(10)),
            StreamJoined.with(Serdes.Long(), new JsonSerde<>(Transaction.class), new JsonSerde<>(Order.class)))
      .groupBy((k, v) -> v.getProductId(), Grouped.with(Serdes.Integer(), new JsonSerde<>(TransactionTotalWithProduct.class)))
      .windowedBy(TimeWindows.of(Duration.ofSeconds(30)))
      .aggregate(
            TransactionTotal::new,
            (k, v, a) -> {
               a.setCount(a.getCount() + 1);
               a.setAmount(a.getAmount() + v.getTransaction().getAmount());
               return a;
            },
            Materialized.<Integer, TransactionTotal> as(storeSupplier)
               .withKeySerde(Serdes.Integer())
               .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
      .toStream()
      .peek((k, v) -> log.info("Total per product last 30s({}): {}", k, v));
}

Interactive queries

We have already created and configured all required Kafka Streams with Spring Cloud. Finally, we can execute queries on state stores. This operation is called an interactive query. Let’s create a REST controller for exposing such endpoints with the results. In order to query Kafka Streams state stores with Spring Cloud, we need to inject the InteractiveQueryService bean into the controller.

@RestController
@RequestMapping("/transactions")
public class TransactionController {

   private InteractiveQueryService queryService;

   public TransactionController(InteractiveQueryService queryService) {
      this.queryService = queryService;
   }

   @GetMapping("/all")
   public TransactionTotal getAllTransactionsSummary() {
      ReadOnlyKeyValueStore<String, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("all-transactions-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get("NEW");
   }

   @GetMapping("/product/{productId}")
   public TransactionTotal getSummaryByProductId(@PathVariable("productId") Integer productId) {
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get(productId);
   }

   @GetMapping("/product/latest/{productId}")
   public TransactionTotal getLatestSummaryByProductId(@PathVariable("productId") Integer productId) {
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("latest-transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get(productId);
   }

   @GetMapping("/product")
   public Map<Integer, TransactionTotal> getSummaryByAllProducts() {
      Map<Integer, TransactionTotal> m = new HashMap<>();
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      KeyValueIterator<Integer, TransactionTotal> it = keyValueStore.all();
      while (it.hasNext()) {
         KeyValue<Integer, TransactionTotal> kv = it.next();
         m.put(kv.key, kv.value);
      }
      return m;
   }

}

Before you run the latest version of the stock-service application you should generate more differentiated random data. Let’s say we would like to generate orders for 5 different products with floating prices as shown below. Just uncomment the following fragment of code in the order-service and run the application once again to generate an infinitive stream of events.

private static long orderId = 0;
private static final Random r = new Random();

private Map<Integer, Integer> prices = Map.of(
      1, 1000, 
      2, 2000, 
      3, 5000, 
      4, 1500, 
      5, 2500);

@Bean
public Supplier<Message<Order>> orderBuySupplier() {
   return () -> {
      Integer productId = r.nextInt(1, 6);
      int price = prices.get(productId) + r.nextInt(-100, 100);
      Order o = new Order(
         ++orderId,
         r.nextInt(1, 6),
         productId,
         100,
         LocalDateTime.now(),
         OrderType.BUY,
         price);
      log.info("Order: {}", o);
      return MessageBuilder
         .withPayload(o)
         .setHeader(KafkaHeaders.MESSAGE_KEY, orderId)
         .build();
   };
}

You may also want to generate more messages. To do that you need to decrease timeout for Spring Cloud Stream Kafka Supplier.

spring.cloud.stream.poller.fixedDelay: 100

After running both our sample applications you may verify the logs on the stock-service side.

Then you may call our REST endpoints performing interactive queries on the materialized Kafka KTable.

$ curl http://localhost:8080/transactions/all
$ curl http://localhost:8080/transactions/product/3
$ curl http://localhost:8080/transactions/product/latest/5

It looks simple? Well, under the hood it may look quite more complicated 🙂 Here’s a final list of topics automatically created to the needs of our application.

kafka-streams-spring-cloud-topics

Final Thoughts

Spring Cloud Stream simplifies working with Kafka Streams and interactive queries. Kafka Streams by itself is a very powerful mechanism. In this article, I showed you how we can use it to implement not very trivial logic and then analyze data in various ways.

The post Kafka Streams with Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/11/11/kafka-streams-with-spring-cloud-stream/feed/ 21 10193