upstash Archives - Piotr's TechBlog https://piotrminkowski.com/tag/upstash/ Java, Spring, Kotlin, microservices, Kubernetes, containers Sat, 29 Oct 2022 08:23:28 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 upstash Archives - Piotr's TechBlog https://piotrminkowski.com/tag/upstash/ 32 32 181738725 Kafka Transactions with Spring Boot https://piotrminkowski.com/2022/10/29/kafka-transactions-with-spring-boot/ https://piotrminkowski.com/2022/10/29/kafka-transactions-with-spring-boot/#comments Sat, 29 Oct 2022 08:23:21 +0000 https://piotrminkowski.com/?p=13623 In this article, you will learn how to use Kafka transactions with the Spring Kafka project in your Spring Boot app. In order to run the Kafka cluster we will use Upstash. This article provides a basic introduction to Kafka transactions. If you are looking for more advanced usage and scenarios you may refer to […]

The post Kafka Transactions with Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka transactions with the Spring Kafka project in your Spring Boot app. In order to run the Kafka cluster we will use Upstash. This article provides a basic introduction to Kafka transactions. If you are looking for more advanced usage and scenarios you may refer to that article, about distributed transactions in microservices. You can also read more about Kafka Streams the Spring Cloud Stream project in this article.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Getting Started with Kafka in Spring Boot

I have already created a Kafka cluster on Upstash using a web dashboard. All the connection credentials are generated automatically. You can find and copy them on the main page of your cluster.

Assuming we have a username as the KAFKA_USER variable and a password as the KAFKA_PASS variable we need to provide the following Spring configuration in the application.yml file:

spring:
  application.name: transactions-service
  kafka:
    bootstrap-servers: inviting-camel-5620-eu1-kafka.upstash.io:9092
    properties:
      security.protocol: SASL_SSL
      sasl.mechanism: SCRAM-SHA-256
      sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${KAFKA_USER}" password="${KAFKA_PASS}";

Here’s a list of required dependencies. Since we exchange JSON messages, we need the Jackson library for serialization or deserialization. Of course, we also need to include Spring Boot starter and Spring Kafka.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
</dependency>

The transactions-service is generating and sending orders. We will create the test topic transactions on the app startup.

@SpringBootApplication
public class TransactionsService {

   public static void main(String[] args) {
      SpringApplication.run(TransactionsService.class, args);
   }

   @Bean
   public NewTopic transactionsTopic() {
      return TopicBuilder.name("transactions")
          .partitions(3)
          .replicas(1)
          .build();
   }

}

Enabling Kafka Transactions in Spring Boot

In Kafka, a producer initiates a transaction by making a request to the transaction coordinator. You can find a detailed description of that process in the following article on the Confluent blog.

With Spring Boot, we just need to set the spring.kafka.producer.transaction-id-prefix property to enable transactions. Spring Boot will do the rest by automatically configuring a KafkaTransactionManager bean and wiring it into the listener container. Here’s a part of the configuration responsible for the message producer. We use JsonSerializer to serialize data from objects into JSON. Transactions prefix is tx-.

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      transaction-id-prefix: tx-

During our scenario, we will send 10 messages within a single transaction. In order to observe logs on the consumer side we set a delay between subsequent attempts to 1 second.

@Transactional
public void generateAndSendPackage() 
      throws InterruptedException, TransactionException {
   for (long i = 0; i < 10; i++) {
      Order t = new Order(id++, i+1, i+2, 1000, "NEW");
      ListenableFuture<SendResult<Long, Order>> result =
         kafkaTemplate.send("transactions", t.getId(), t);
      result.addCallback(callback);
      Thread.sleep(1000);
   }
}

Enable Transactions on the Kafka Consumer Side

In the first step, we will just print the incoming messages. We need to annotate the listener method with the @KafkaListener. The target topic is transactions, and the consumer group is a. Also, we have to add the @Transactional annotation to enable transaction support for the listen method.

@Service
public class TransactionsListener {

   private static final Logger LOG = LoggerFactory
          .getLogger(TransactionsListener.class);

   @KafkaListener(
          id = "transactions",
          topics = "transactions",
          containerGroup = "a",
          concurrency = "3")
   @Transactional
   public void listen(Order order) {
      LOG.info("{}", order);
   }
}

Let’s run the producer app first. To do go to the transactions-service directory and execute the command mvn spring-boot:run. It is a good idea to enable more detailed logs for Spring Kafka transactions. To do that add the following line to the application.yml file:

logging:
  level:
    org.springframework.transaction: trace
    org.springframework.kafka.transaction: debug

After that, let’s run the consumer app. In order to that go to the accounts-service directory and run the same command as before. You should see the following topic created in the Upstash console:

kafka-transactions-spring-boot-upstash

The transactions-service app exposes the REST endpoint for sending messages. It just starts that procedure of generating and sending 10 messages within a single transaction I mentioned in the previous section. Let’s call the endpoint:

$ curl -X POST http://localhost:8080/transactions

Let’s see at the logs on the producer side. After sending all the messages it committed the transaction.

kafka-transactions-spring-boot-logs

Now, let’s see how it looks on the consumer side. All the messages are received just after being sent by the producer app. It is not something that we expected…

In order to verify what happened, we need to take a look at the consumer app logs. Here’s a fragment with Kafka consumer settings. As you see, by default Spring Kafka sets the transactions isolation level to read_uncommitted for Spring Boot.

Deep Dive into Transactions with Spring Kafka

In order to solve the problem with transactions from the previous section, we need to change the default isolation level in the application.yml file. As the spring.kafka.consumer.properties we have to set the isolation.level property to read_commited as shown below.

spring:
  application.name: accounts-service
  kafka:
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"
        isolation.level: read_committed

After that let’s run the accounts-service app once again.

Now, all the messages have been received after the producer committed the transaction. There are three consumer threads as we set the @KafkaListener concurrency parameter to 3.

kafka-transactions-spring-boot-producer

In the next step, we will test the rollback of transactions on the producer side. In order to do that, we will modify the method for generating and sending orders. Now, the generateAndSendPackage is getting a boolean parameter, that indicates if a transaction should be rollbacked or not.

@Transactional
public void generateAndSendPackage(boolean error)
       throws InterruptedException {
   for (long i = 0; i < 10; i++) {
      Order t = new Order(id++, i+1, i+2, 1000, "NEW");
      ListenableFuture<SendResult<Long, Order>> result =
            kafkaTemplate.send("transactions", t.getId(), t);
      result.addCallback(callback);
      if (error && i > 5)
         throw new RuntimeException();
      Thread.sleep(1000);
   }
}

Here are the logs from our test. After sending six orders the method throws a RuntimeException and Spring rollbacks a transaction. As expected, the consumer app does not receive any messages.

It is important to know that Spring rollbacks are only on unchecked exceptions by default. To rollback checked exceptions, we need to specify the rollbackFor on the @Transactional annotation.

The transactional producer sends messages to the Kafka cluster even before committing the transaction. You could see it in the previous section, where the listener was continuously receiving messages if the isolation level was read_uncommited. Consequently, if we roll back a transaction on the producer side the message sent before rollback occurs come to the Kafka broker. We can see it e.g. in the Upstash live message view for the transactions topic.

kafka-transactions-spring-boot-live

Here’s the current value of offsets for all partitions in the transactions topic for the a consumer group. We made a successful commit after sending the first package of 10 messages and we rollbacked the transaction with the second package. The sum of offsets is 10 in that case. But in fact, it is different that the current latest offset on those partitions.

To verify it, we can, for example, change a consumer group name for the listener to b and start another instance of the accounts-service.

@KafkaListener(
     id = "transactions",
     topics = "transactions",
     containerGroup = "b",
     concurrency = "3")
@Transactional
public void listen(Order order) {
   LOG.info("{}", order);
}

Here’s the current value of offsets for the b consumer group.

Of course, the messages have been rollbacked. But the important thing to understand here is that these operations happen on the Kafka broker side. The transaction coordinator changes the values of Kafka offsets. We can easily verify that consumer won’t receive messages after rollback even if we the initial offset to the earliest with the spring.kafka.consumer.auto-offset-reset property.

Add Database

In this section, we will extend our scenario with new functionalities. Our app will store the status of orders in the database. Just for demo purposes, we will use an in-memory database H2. There are two dependencies required in this scenario: H2 and Spring Data JPA.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>

There the OrderGroup entity that stores the current status of the package (SENT, CONFIRMED, ROLLBACK), the total number of orders in the single package, and the total number of processed orders by the accounts-service.

@Entity
public class OrderGroup {

   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long id;
   private String status;
   private int totalNoOfOrders;
   private int processedNoOfOrders;

   // GETTERS/SETTERS ...
}

In order to manage the entity we use the Spring Data repository pattern:

public interface OrderGroupRepository extends 
   CrudRepository<OrderGroup, Long> {
}

We will also include a database in the accounts-service app. When it processes the incoming orders it performs transfers between the source and target account. It will store the account balance in the database.

@Entity
public class Account {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private int balance;

   // GETTERS/SETTERS ...
}

The same as before there is a repository bean for managing the Account entity.

public interface AccountRepository extends
   CrudRepository<Account, Long> {
}

We also need to modify the Order message exchanged between the apps. It has to contain the groupId field for processing confirmations.

public class Order {

    private Long id;
    private Long sourceAccountId;
    private Long targetAccountId;
    private int amount;
    private String status;
    private Long groupId;

   // GETTERS/SETTERS ...
}

Here’s the diagram that illustrates our architecture for the described scenario.

kafka-transactions-spring-boot-arch

Handling Transactions Across Multiple Resources

After including Spring Data JPA there are two registered TransactionManager beans with names transactionManager and kafkaTransactionManager. Therefore we need to choose the name of the transaction manager inside the @Transactional annotation. In the first step, we add a new entity to the database. The primary key id is auto-generated in the database and then returned to the object. After that, we get groupId and generate the sequence of orders within that group. Of course, both operations (save to database, sent to Kafka) are part of the same transaction.

@Transactional("kafkaTransactionManager")
public void sendOrderGroup(boolean error) throws InterruptedException {
   OrderGroup og = repository.save(new OrderGroup("SENT", 10, 0));
   generateAndSendPackage(error, og.getId());
}

private void generateAndSendPackage(boolean error, Long groupId)
      throws InterruptedException {
   for (long i = 0; i < 10; i++) {
      Order o = new Order(id++, i+1, i+2, 1000, "NEW", groupId);
      ListenableFuture<SendResult<Long, Order>> result =
         kafkaTemplate.send("transactions", o.getId(), o);
      result.addCallback(callback);
      if (error && i > 5)
         throw new RuntimeException();
      Thread.sleep(1000);
   }
}

The accounts-service app listens for incoming orders. It is processing every single order in a separate transaction. It checks if sufficient funds are in the customer account to make a transfer. If there is enough money, it performs a transfer. Finally, it sends the response to transactions-service with the transaction status. The message is sent to the orders topic.

@KafkaListener(
   id = "transactions",
   topics = "transactions",
   groupId = "a",
   concurrency = "3")
@Transactional("kafkaTransactionManager")
public void listen(Order order) {
   LOG.info("Received: {}", order);
   process(order);
}

private void process(Order order) {
   Account accountSource = repository
      .findById(order.getSourceAccountId())
      .orElseThrow();
   Account accountTarget = repository
      .findById(order.getTargetAccountId())
      .orElseThrow();
   if (accountSource.getBalance() >= order.getAmount()) {
      accountSource.setBalance(accountSource.getBalance() - order.getAmount());
      repository.save(accountSource);
      accountTarget.setBalance(accountTarget.getBalance() + order.getAmount());
      repository.save(accountTarget);
      order.setStatus("PROCESSED");
   } else {
      order.setStatus("FAILED");
   }
   LOG.info("After processing: {}", order);
   kafkaTemplate.send("orders", order.getId(), order);
}

The transactions-service listens for order confirmations on the orders topic. Once it receives the message it increases the number of processed orders within an order group and stores the current result in the database. We should use a default Spring transaction manager since we don’t send any messages to Kafka.

@KafkaListener(
      id = "orders",
      topics = "orders",
      groupId = "a",
      concurrency = "3")
@Transactional("transactionManager")
public void listen(Order order) {
   LOG.info("{}", order);
   OrderGroup og = repository
      .findById(order.getGroupId())
      .orElseThrow();
   if (order.getStatus().equals("PROCESSED")) {      
      og.setProcessedNoOfOrders(og.getProcessedNoOfOrders() + 1);
      og = repository.save(og);
      LOG.info("Current: {}", og);
   }
}

Don’t forget to lock the OrderGroup record during the transaction. Since we are processing messages concurrently (with 3 threads) we need to lock the OrderGroup record until we update the value of processedNoOfOrders column:

public interface OrderGroupRepository extends
        CrudRepository<OrderGroup, Long> {

    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Optional<OrderGroup> findById(Long groupId);
}

Let’s test a positive scenario. We will generate a group of orders that should be confirmed. To do that let’s call our endpoint POST /transactions:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d 'false'

Here are the logs from the accounts-service app:

We can also take at the logs generated by the transactions-service app:

Finally, we can verify the current status of our order group by calling the following endpoint:

$ curl -X GET 'http://localhost:8080/transactions'

What happens if we roll back the transaction? Try it by yourself with the following command:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d 'true'

Final Thoughts

You can easily handle Kafka transactions with Spring Boot using the Spring Kafka project. You can integrate your app with a database and handle transactions across multiple resources. However, one thing needs to be clarified – Kafka does not support XA transactions. It may result in data inconsistency. Spring does not solve that case, it just performs two transactions in the background. When the @Transactional method exits, Spring Boot will commit the database transactions first and then the Kafka transactions. You can just change that order to enable Kafka transaction commit first by configuring the outer method configured to use the DataSourceTransactionManager, and the inner method to use the KafkaTransactionManager.

Can we solve that case somehow? Of course. There is, for example, project Debezium that allows you to stream database changes into Kafka topics. With that approach, you can just commit changes in the database, and then configure Debezium to send events with changes to Kafka. For more details about that tool and outbox pattern please refer to the article available here.

The post Kafka Transactions with Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/10/29/kafka-transactions-with-spring-boot/feed/ 6 13623
Deep Dive into Saga Transactions with Kafka Streams and Spring Boot https://piotrminkowski.com/2022/02/07/deep-dive-into-saga-transactions-with-kafka-streams-and-spring-boot/ https://piotrminkowski.com/2022/02/07/deep-dive-into-saga-transactions-with-kafka-streams-and-spring-boot/#comments Mon, 07 Feb 2022 14:38:20 +0000 https://piotrminkowski.com/?p=10587 In this article, you will learn how to use Kafka Streams and Spring Boot to perform transactions according to the Saga pattern. To be honest, I was quite surprised by a great deal of attention to my last article about Kafka. I got some questions about streams, transactions, and support for Kafka in Spring Boot. […]

The post Deep Dive into Saga Transactions with Kafka Streams and Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams and Spring Boot to perform transactions according to the Saga pattern. To be honest, I was quite surprised by a great deal of attention to my last article about Kafka. I got some questions about streams, transactions, and support for Kafka in Spring Boot. In this article, I’ll try to answer a few of them. I will also show how you can easily set up a cloud-managed Kafka on the Upstash.

Introduction

First of all, let’s recap the approach described in the previous article. We used Kafka Streams to process order transactions on the order-service side. To handle orders coming to the stock-service and payment-service we used a standard Spring @KafkaListener. There are also two databases – a single database per every service. The stock-service stores data related to the number of available products and updates them after receiving an order. The same with the payment-service. It updates the customer’s account on every single order. Both applications receive orders from Kafka topic. They send responses to other topics. But just to simplify, we will skip it as shown in the figure below. We treat the Kafka orders topic as a stream of events and also as a table with the latest order’s status.

kafka-streams-transactions-old-arch

What may go wrong with that approach? In fact, we have two data sources here. We use Kafka as the order store. On the other hand, there are SQL databases (in my case H2, but you can use any other) that store stock and payment data. Once we send an order with a reservation to the Kafka topic, we need to update a database. Since Kafka does not support XA transactions, it may result in data inconsistency. Of course, Kafka doesn’t support XA transactions the same as many other systems including e.g. RabbitMQ.

The question is what can we do with that? One of the possible options you may use is an approach called Change Data Capture (CDC) with the outbox pattern. CDC identifies and tracks changes to data in a database. Then it may emit those changes as events and send them, for example to the Kafka topic. I won’t go into the details of that process. If you are interested in you may read this article written by Gunnar Morling.

Architecture with Kafka Streams

The approach I will describe today is fully based on the Kafka Streams. We won’t use any SQL databases. When the order-service sends a new order its id is the message key. With Kafka Streams, we may change a message key in the stream. It results in creating new topics and repartitioning. With new message keys, we may perform calculations just for the specific customerId or productId. The result of such calculation may be saved in the persistent store. For example, Kafka automatically creates and manages such state stores when you are calling stateful operations like count() or aggregate(). We will aggregate the orders related to the particular customer or product. Here’s the illustration of our architecture. Here’s the visualization of our process.

kafka-streams-transactions-arch

Now, let’s consider a scenario for the payment-service in details. In the incoming stream of orders the payment-service calls the selectKey() operation. It changes the key from the order’s id into the order’s customerId. Then it groups all the orders by the new key and invokes the aggregate() operation. In the aggregate() method it calculates the available amount and reserved amount based on the order’s price and status (whether it is a new order or a confirmation order). If there are sufficient funds on the customer account it sends the ACCEPT order to the payment-orders topic. Otherwise, it sends the REJECT order. Then the order-service process responses by joining streams from payment-orders and stock-orders by the order’s id. As the result, it sends a confirmation or a rollback order.

kafka-streams-transactions-details

Finally, let’s proceed to the implementation!

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then switch to the streams-full branch. After that, you should just follow my instructions.

Aggregation with Kafka Streams

Let’s begin with the payment-service. The implementation of KStream in not complicated here. In the first step (1), we invoke the selectKey() method and get the customerId value of the Order object as a new key. Then we call groupByKey() method (2) to receive KGroupedStream as a result. While we have KGroupedStream we may invoke one of the calculation methods. In that case, we need to use aggregate(), since we have a little bit more advanced calculation than just a simple count (3). The last two steps are just for printing the value after calculation.

@Bean
public KStream<Long, Order> stream(StreamsBuilder builder) {
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   JsonSerde<Reservation> rsvSerde = new JsonSerde<>(Reservation.class);
   KStream<Long, Order> stream = builder
      .stream("orders", Consumed.with(Serdes.Long(), orderSerde))
      .peek((k, order) -> LOG.info("New: {}", order));

   KeyValueBytesStoreSupplier customerOrderStoreSupplier =
      Stores.persistentKeyValueStore("customer-orders");

   stream.selectKey((k, v) -> v.getCustomerId()) // (1)
      .groupByKey(Grouped.with(Serdes.Long(), orderSerde)) // (2)
      .aggregate(
         () -> new Reservation(random.nextInt(1000)),
         aggregatorService,
         Materialized.<Long, Reservation>as(customerOrderStoreSupplier)
            .withKeySerde(Serdes.Long())
            .withValueSerde(rsvSerde)) // (3)
      .toStream()
      .peek((k, trx) -> LOG.info("Commit: {}", trx));

   return stream;
}

However, the most important step in the fragment of code visible above is the class called inside the aggregate() method. The aggregate() method takes three input arguments. The first of them indicates the starting value of our compute object. That object represents the current state of the customer’s account. It has two fields: amountAvailable and amountReserved. To clarify, we use that object instead of the entity that stores available and reserved amounts on the customer account. Each customer is represented by the customerId (key) and the Reservation object (value) in Kafka KTable. Just for the test purpose, we are generating the starting value of amountAvailable as a random number between 0 and 1000.

public class Reservation {
   private int amountAvailable;
   private int amountReserved;

   public Reservation() {
   
   }

   public Reservation(int amountAvailable) {
      this.amountAvailable = amountAvailable;
   }

   // GETTERS AND SETTERS ...

}

Ok, let’s take a look at our aggregation method. It needs to implement the Kafka Aggregate interface and its method apply(). It may handle three types of orders. One of them is a confirmation of the order (1). It confirms the distributed transaction, so we just need to cancel a reservation by subtracting the order’s price from the amountReserved field. On the other, in the case of rollback, we need to increase the value of amountAvailable by the order’s price and decrease the value amountRerserved accordingly (2). Finally, if we receive a new order we need to perform a reservation if there are sufficient funds on the customer account, or otherwise, reject an order.

Aggregator<Long, Order, Reservation> aggregatorService = (id, order, rsv) -> {
   switch (order.getStatus()) {
      case "CONFIRMED" -> // (1)
         rsv.setAmountReserved(rsv.getAmountReserved() 
               - order.getPrice());
      case "ROLLBACK" -> { // (2)
         if (!order.getSource().equals("PAYMENT")) {
            rsv.setAmountAvailable(rsv.getAmountAvailable() 
                  + order.getPrice());
            rsv.setAmountReserved(rsv.getAmountReserved() 
                  - order.getPrice());
         }
      }
      case "NEW" -> { // (3)
         if (order.getPrice() <= rsv.getAmountAvailable()) {
            rsv.setAmountAvailable(rsv.getAmountAvailable() 
                  - order.getPrice());
            rsv.setAmountReserved(rsv.getAmountReserved() 
                  + order.getPrice());
            order.setStatus("ACCEPT");
         } else {
            order.setStatus("REJECT");
         }
         template.send("payment-orders", order.getId(), order);
      }
   }
   LOG.info("{}", rsv);
   return rsv;
};

State Store with the Kafka Streams Table

The implementation of the stock-service is pretty similar to the payment-service. With the difference that we count a number of available products on stock instead of available funds on the customer account. Here’s our Reservation object:

public class Reservation {
   private int itemsAvailable;
   private int itemsReserved;

   public Reservation() {
    
   }

   public Reservation(int itemsAvailable) {
      this.itemsAvailable = itemsAvailable;
   }

   // GETTERS AND SETTERS ...

}

The implementation of the aggregation method is also very similar to the payment-service. However, this time, let’s focus on another thing. Once we process a new order we need to send a response to the stock-orders topic. We use KafkaTemplate for that. In the case of payment-service we also send a response, but to the payment-orders topic. The send method from the KafkaTemplate does not block the thread. It returns the ListenableFuture objects. We may add a callback to the send method using it and the result after sending the message (1). Finally, let’s log the current state of the Reservation object (2).

Aggregator<Long, Order, Reservation> aggrSrv = (id, order, rsv) -> {
   switch (order.getStatus()) {
      case "CONFIRMED" -> rsv.setItemsReserved(rsv.getItemsReserved() 
            - order.getProductCount());
      case "ROLLBACK" -> {
         if (!order.getSource().equals("STOCK")) {
            rsv.setItemsAvailable(rsv.getItemsAvailable() 
                  + order.getProductCount());
            rsv.setItemsReserved(rsv.getItemsReserved() 
                  - order.getProductCount());
         }
      }
      case "NEW" -> {
         if (order.getProductCount() <= rsv.getItemsAvailable()) {
            rsv.setItemsAvailable(rsv.getItemsAvailable() 
                  - order.getProductCount());
            rsv.setItemsReserved(rsv.getItemsReserved() 
                  + order.getProductCount());
            order.setStatus("ACCEPT");
         } else {
            order.setStatus("REJECT");
         }
         // (1)
         template.send("stock-orders", order.getId(), order)
            .addCallback(r -> LOG.info("Sent: {}", 
               result != null ? result.getProducerRecord().value() : null),
               ex -> {});
      }
   }
   LOG.info("{}", rsv); // (2)
   return rsv;
};

After that, we are also logging the value of the Reservation object (1). In order to do that we need to convert KTable into KStream and then call the peek method. This log is printed just after Kafka Streams commits the offset in the source topic.

@Bean
public KStream<Long, Order> stream(StreamsBuilder builder) {
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   JsonSerde<Reservation> rsvSerde = new JsonSerde<>(Reservation.class);
   KStream<Long, Order> stream = builder
      .stream("orders", Consumed.with(Serdes.Long(), orderSerde))
      .peek((k, order) -> LOG.info("New: {}", order));

   KeyValueBytesStoreSupplier stockOrderStoreSupplier =
      Stores.persistentKeyValueStore("stock-orders");

   stream.selectKey((k, v) -> v.getProductId())
      .groupByKey(Grouped.with(Serdes.Long(), orderSerde))
      .aggregate(() -> new Reservation(random.nextInt(100)), aggrSrv,
         Materialized.<Long, Reservation>as(stockOrderStoreSupplier)
            .withKeySerde(Serdes.Long())
            .withValueSerde(rsvSerde))
      .toStream()
      .peek((k, trx) -> LOG.info("Commit: {}", trx)); // (1)

   return stream;
}

What will happen if you send the test order? Let’s see the logs. You can see the difference in time between processing the message and offset commit. You won’t have any problems with that until your application is running or it has been stopped gracefully. But if you, for example, kill the process using the kill -9 command? After restart, our application will receive the same messages once again. Since we use KafkaTemplate to send the response to the stock-orders topic, we need to commit the offset as soon as possible.

What can we do to avoid such problems? We may override the default value (30000) of the commit.interval.ms Kafka Streams property. If you set it to 0, it commits immediately after processing finishes.

spring.kafka:  
  streams:
    properties:
      commit.interval.ms: 0

On the other hand, we can also set the property processing.guarantee to exactly_once. It also changes the default value of commit.interval.ms to 100ms and enables idempotence for a producer. You can read more about it here in Kafka documentation.

spring.kafka:  
  streams:
    properties:
      processing.guarantee: exactly_once

Running Kafka on Upstash

For the purpose of today’s exercise, we will use a serverless Kafka cluster on Upstash. You can create it with a single click. If you would like to test JAAS authentication for your application I’ve got good news 🙂 The authentication on that cluster is enabled by default. You can find and copy username and password from the cluster’s main panel.

kafka-streams-transactions-upstash

Now, let’s configure Kafka connection settings and credentials for the Spring Boot application. There is a developer free tier on Upstash up to 10k messages per day. It will be enough for our tests.

spring.kafka:
  bootstrap-servers: topical-gar-11460-us1-kafka.upstash.io:9092
  properties:
    security.protocol: SASL_SSL
    sasl.mechanism: SCRAM-SHA-256
    sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${USERNAME}" password="${PASSWORD}";

With Upstash you can easily display a list of topics. In total, there are 10 topics used in our sample system. Three of them are used directly by the Spring Boot applications, while the rest of them by the Kafka Streams in order to process stateful operations.

After starting the order-service application we can call its REST endpoint to create and send an order to the Kafka topic.

private static final Logger LOG = 
   LoggerFactory.getLogger(OrderController.class);
private AtomicLong id = new AtomicLong();
private KafkaTemplate<Long, Order> template;

@PostMapping
public Order create(@RequestBody Order order) {
   order.setId(id.incrementAndGet());
   template.send("orders", order.getId(), order);
   LOG.info("Sent: {}", order);
   return order;
}

Let’s call the endpoint using the following curl command. You can use any customerId or productId you want.

$ curl -X 'POST' \
  'http://localhost:8080/orders' \
  -H 'Content-Type: application/json' \
  -d '{
    "customerId": 20,
    "productId": 20,
    "productCount": 2,
    "price": 10,
    "status": "NEW"
  }'

All three sample applications use Kafka Streams to process distributed transactions. Once the order is accepted by both stock-service and payment-service you should see the following entry in the order-service logs.

You can easily simulate rejection of transactions with Kafka Streams just by setting e.g. productCount higher than the value generated by the product-service as available items.

With Upstash UI you can also easily verify the number of messages incoming to the topics. Let’s see the current statistics for the orders topic.

Final Thoughts

In order to fully understand what happens in this example, you should be also familiar with the Kafka Streams threading model. It is worth reading the following article, which explains it in a clean manner. First of all, each stream partition is a totally ordered sequence of data records and maps to a Kafka topic partition. It means, that even if we have multiple orders at the same time related to e.g. same product, they are all processed sequentially since they have the same message key (productId in that case).

Moreover, by default, there is only a single stream thread that handles all the partitions. You can see this in the logs below. However, there are stream tasks that act as the lowest-level units of parallelism. As a result, stream tasks can be processed independently and in parallel without manual intervention.

I hope this article helps you to better understand Kafka Streams. I just wanted to give you a simple example of how you can use Kafka Streams with Saga transactions in order to simplify your current architecture.

The post Deep Dive into Saga Transactions with Kafka Streams and Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/02/07/deep-dive-into-saga-transactions-with-kafka-streams-and-spring-boot/feed/ 8 10587