quarkus kafka Archives - Piotr's TechBlog https://piotrminkowski.com/tag/quarkus-kafka/ Java, Spring, Kotlin, microservices, Kubernetes, containers Wed, 22 Feb 2023 09:49:00 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 quarkus kafka Archives - Piotr's TechBlog https://piotrminkowski.com/tag/quarkus-kafka/ 32 32 181738725 Local Development with Redpanda, Quarkus and Testcontainers https://piotrminkowski.com/2022/04/20/local-development-with-redpanda-quarkus-and-testcontainers/ https://piotrminkowski.com/2022/04/20/local-development-with-redpanda-quarkus-and-testcontainers/#comments Wed, 20 Apr 2022 08:13:36 +0000 https://piotrminkowski.com/?p=11098 In this article, you will learn how to speed up your local development with Redpanda and Quarkus. The main goal is to show that you can replace Apache KafkaⓇ with Redpanda without any changes in the source code. Instead, you will get a fast way to run your existing Kafka applications without Zookeeper and JVM. […]

The post Local Development with Redpanda, Quarkus and Testcontainers appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to speed up your local development with Redpanda and Quarkus. The main goal is to show that you can replace Apache Kafka with Redpanda without any changes in the source code. Instead, you will get a fast way to run your existing Kafka applications without Zookeeper and JVM. You will also see how Quarkus uses Redpanda as a local instance for development. Finally, we are going to run all containers in the Testcontainers Cloud.

For the current exercise, we use the same examples as described in one of my previous articles about Quarkus and Kafka Streams. Just to remind you: we are building a simplified version of the stock market platform. The stock-service application receives and handles incoming orders. There are two types of orders: purchase (BUY) and sale (SELL). While the stock-service consumes Kafka streams, the order-service generates and sends events to the orders.buy and orders.sell topics. Here’s the diagram with our architecture. As you see, the stock-service also uses PostgreSQL as a database.

quarkus-redpanda-arch

Source Code

If you would like to try this exercise yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. Then switch to the dev branch. After that, you should just follow my instructions. Let’s begin.

Install Redpanda

This step is not required. However, it is worth installing Redpanda since it provides a useful CLI called Redpanda Keeper (rpk) to manage a cluster. To install Redpanda on macOS just run the following command:

$ brew install redpanda-data/tap/redpanda

Now, we can easily create and run a new cluster. For the development purpose, we only need a single-node Redpanda cluster. In order to run, you need to have Docker on your laptop.

$ rpk container start

Before proceeding to the next steps let’s just remove a current cluster. Quarkus will create everything for us automatically.

$ rpk container purge

Quarkus with Kafka and Postgres

Let’s begin with the stock-service. It consumes streams from Kafka topics and connects to the PostgreSQL database, as I mentioned before. So, the first step is to include the following dependencies:

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-kafka-streams</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-jdbc-postgresql</artifactId>
</dependency>

Now, we may proceed to the implementation. The topology for all the streams is provided inside the following method:

@Produces
public Topology buildTopology() {
   ...
}

There are some different streams defined there. But let’s just take a look at the fragment of topology responsible for creating transactions from incoming orders

final String ORDERS_BUY_TOPIC = "orders.buy";
final String ORDERS_SELL_TOPIC = "orders.sell";
final String TRANSACTIONS_TOPIC = "transactions";

// ... other streams

KStream<Long, Order> orders = builder.stream(
   ORDERS_SELL_TOPIC,
   Consumed.with(Serdes.Long(), orderSerde));

builder.stream(ORDERS_BUY_TOPIC, Consumed.with(Serdes.Long(), orderSerde))
   .merge(orders)
   .peek((k, v) -> {
      log.infof("New: %s", v);
      logic.add(v);
   });

builder.stream(ORDERS_BUY_TOPIC, Consumed.with(Serdes.Long(), orderSerde))
   .selectKey((k, v) -> v.getProductId())
   .join(orders.selectKey((k, v) -> v.getProductId()),
      this::execute,
      JoinWindows.of(Duration.ofSeconds(10)),
      StreamJoined.with(Serdes.Integer(), orderSerde, orderSerde))
   .filterNot((k, v) -> v == null)
   .map((k, v) -> new KeyValue<>(v.getId(), v))
   .peek((k, v) -> log.infof("Done -> %s", v))
   .to(TRANSACTIONS_TOPIC, Produced.with(Serdes.Long(), transactionSerde));

The whole implementation is more advanced. For the details, you may refer to the article I mentioned in the introduction. Now, let’s imagine we are still developing our stock market app. Firstly, we should run PostgreSQL and a local Kafka cluster. We use Redpanda, which is easy to run locally. After that, we would typically provide addresses of both the database and broker in the application.properties. But using a feature called Quarkus Dev Services, the only thing we need to configure now, are the names of topics used for consuming Kafka Streams and the application id. Both of these are required by Kafka Streams.

Now, the most important thing: you just need to start the Quarkus app. Nothing more. DO NOT run any external tools by yourself and DO NOT provide any addresses for them in the configuration settings. Just add the two lines you see below:

quarkus.kafka-streams.application-id = stock
quarkus.kafka-streams.topics = orders.buy,orders.sell

Run Quarkus in dev mode with Redpanda

Before you run the Quarkus app, make sure you have Docker running on your laptop. When you do, the only thing you need is to start both test apps. Let’s begin with the stock-service since it receives orders generated by the order-service. Go to the stock-service directory and run the following command:

$ cd stock-service
$ mvn quarkus:dev

If you see the following logs, it means that everything went well. Our application has been started in 13 seconds. During this time, Quarkus also started Kafka, PostgreSQL on Docker, and built Kafka Streams. Everything in 13 seconds with a single command and without any additional configuration. Nice, right? Let’s check out what happened in the background:

Firstly, let’s find the following line of logs beginning with the sentence “Dev Services for Kafka started”. It perfectly describes the feature of Quarkus called Dev Services. Our Kafka instance has been started as a Docker container and is available under a dynamically generated port. The application connects to it. All other Quarkus apps you would run now will share the same instance of a broker. You can disable that feature by setting the property quarkus.kafka.devservices.shared to false.

It may be a little surprising, but Quarkus Dev Services for Kafka uses Redpanda to run a broker. Of course, Redpanda is a Kafka-compatible solution. Since it starts in ~one second and does not require Zookeeper, it is a great choice for local development.

In order to run tools like brokers or databases on Docker, Quarkus uses Testcontainers. If you are interested in more details about Quarkus Dev Services for Kafka, read the following documentation. For now, let’s display a list of running containers using the docker ps command. There is a container with Redpanda, PostgreSQL, and Testcontainers.

quarkus-redpanda-containers

Manage Kafka Streams with Redpanda and Quarkus

Let’s verify how everything works on the application side. After running the application, we can take advantage of another useful Quarkus feature called Dev UI. Our UI console is available under the address http://localhost:8080/q/dev/. After accessing it, you can display a topology of Kafka Streams by clicking the button inside the Apache Kafka Streams tile.

Here you will see a summary of available streams. For me, it is 12 topics and 15 state stores. You may also see a visualization of Kafka Streams’ topology. The following picture shows the fragment of topology. You can download the full image by clicking the green download button, visible on the right side of the screen.

quarkus-redpanda-dev

Now, let’s use the Redpanda CLI to display a list of created topics. In my case, Redpanda is available under the port 55001 locally. All the topics are automatically created by Quarkus during application startup. We need to define the names of topics used in communication between both our test apps. Those topics are: orders.buy, orders.sell and transactions. They are configured and created by the order-service. The stock-service is creating all other topics visible below, which are responsible for handling streams.

$ rpk topic list --brokers localhost:55001
NAME                                                    PARTITIONS  REPLICAS
orders.buy                                              1           1
orders.sell                                             1           1
stock-KSTREAM-JOINOTHER-0000000016-store-changelog      1           1
stock-KSTREAM-JOINOTHER-0000000043-store-changelog      1           1
stock-KSTREAM-JOINOTHER-0000000065-store-changelog      1           1
stock-KSTREAM-JOINTHIS-0000000015-store-changelog       1           1
stock-KSTREAM-JOINTHIS-0000000042-store-changelog       1           1
stock-KSTREAM-JOINTHIS-0000000064-store-changelog       1           1
stock-KSTREAM-KEY-SELECT-0000000005-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000006-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000032-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000033-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000054-repartition         1           1
stock-KSTREAM-KEY-SELECT-0000000055-repartition         1           1
stock-transactions-all-summary-changelog                1           1
stock-transactions-all-summary-repartition              1           1
stock-transactions-per-product-summary-30s-changelog    1           1
stock-transactions-per-product-summary-30s-repartition  1           1
stock-transactions-per-product-summary-changelog        1           1
stock-transactions-per-product-summary-repartition      1           1
transactions                                            1           1

In order to do a full test, we also need to run order-service. It is generating orders continuously and sending them to the orders.buy or orders.sell topics. Let’s do that.

Send messages to Redpanda with Quarkus

Before we run order-service, let’s see some implementation details. On the producer side, we need to include a single dependency responsible for integration with a Kafka broker:

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

Our application generates and sends random orders to the orders.buy or orders.sell topics. There are two methods for that, each of them dedicated to a single topic. Let’s just see a method for generating BUY orders. We need to annotate it with @Outgoing and set the channel name (orders-buy). Our method generates a single order per 500 milliseconds.

@Outgoing("orders-buy")
public Multi<Record<Long, Order>> buyOrdersGenerator() {
   return Multi.createFrom().ticks().every(Duration.ofMillis(500))
      .map(order -> {
         Integer productId = random.nextInt(10) + 1;
         int price = prices.get(productId) + random.nextInt(200);
         Order o = new Order(
            incrementOrderId(),
            random.nextInt(1000) + 1,
            productId,
            100 * (random.nextInt(5) + 1),
            LocalDateTime.now(),
            OrderType.BUY,
            price);
         log.infof("Sent: %s", o);
      return Record.of(o.getId(), o);
   });
}

After that, we need to map the channel name into a target topic name. Another required operation is to set the serializer for the message key and value.

mp.messaging.outgoing.orders-buy.connector = smallrye-kafka
mp.messaging.outgoing.orders-buy.topic = orders.buy
mp.messaging.outgoing.orders-buy.key.serializer = org.apache.kafka.common.serialization.LongSerializer
mp.messaging.outgoing.orders-buy.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Finally, go to the order-service directory and run the application.

$ cd order-service
$ mvn quarkus:dev

Once you start order-service, it will create topics and start sending orders. It uses the same instance of Redpanda as stock-service. You can run the docker ps command once again to verify it.

Now, just do a simple change in stock-service to reload the application. It will also reload the Kafka Streams topology. After that, it is starting to receive orders from the topics created by the order-service. Finally, it will create transactions from incoming orders and store them in the transactions topic.

Use Testcontainers Cloud

In our development process, we need to have a locally installed Docker ecosystem. But, what if we don’t have it? That’s where Testcontainers Cloud comes in. Testcontainers Cloud is the developer-first SaaS platform for modern integration testing with real databases, message brokers, cloud services, or any other component of application infrastructure. To simplify, we will do the same thing as before but our instances of Redpanda and PostgreSQL will not run on the local Docker, but on the remote Testcointainers platform.

What do you need to do to enable Testcontainers Cloud? Firstly, download the agent from the following site. You also need to be a beta tester to obtain an authorization token. Finally, just run the agent and kill your local Docker daemon. You should see the Testcontainers icon in the running apps with information about the connection to the cloud.

quarkus-redpanda-testcontainers

Docker should not run locally.

The same as before, just run both applications with the quarkus:dev command. Your Redpanda broker is running on the Testcontainers Cloud but, thanks to the agent, you may access it over localhost.

Once again you can verify a list of topics using the following command for the new broker:

$ rpk topic list --brokers localhost:59779

Final Thoughts

In this article, I focused on showing you how new and exciting technologies like Quarkus, Redpanda, and Testcontainers can work together. Local development is one of the use cases, but you may as well use them to write integration tests.

The post Local Development with Redpanda, Quarkus and Testcontainers appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/04/20/local-development-with-redpanda-quarkus-and-testcontainers/feed/ 2 11098
Kafka Streams with Quarkus https://piotrminkowski.com/2021/11/24/kafka-streams-with-quarkus/ https://piotrminkowski.com/2021/11/24/kafka-streams-with-quarkus/#comments Wed, 24 Nov 2021 08:24:53 +0000 https://piotrminkowski.com/?p=10234 In this article, you will learn how to use Kafka Streams with Quarkus. The same as in my previous article we will create a simple application that simulates the stock market. But this time, we are going to use Quarkus instead of Spring Cloud. If you would like to figure out what is a streaming […]

The post Kafka Streams with Quarkus appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams with Quarkus. The same as in my previous article we will create a simple application that simulates the stock market. But this time, we are going to use Quarkus instead of Spring Cloud. If you would like to figure out what is a streaming platform and how it differs from a traditional message broker this article is for you. Moreover, we will study useful improvements related to Apache Kafka provided by Quarkus.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Architecture

In our case, there are two incoming streams of events. Both of them represent incoming orders. These orders are generated by the order-service application. It sends buy orders to the orders.buy topic and sell orders to the orders.sell topic. Then, the stock-service application receives and handles incoming events. In the first step, it needs to change the key of each message from the orderId to the productId. That’s because it has to join orders from different topics related to the same product in order to execute transactions. Finally, the transaction price is an average of sale and buy prices.

quarkus-kafka-streams-arch

We are building a simplified version of the stock market platform. Each buy order contains a maximum price at which a customer is expecting to buy a product. On the other hand, each sale order contains a minimum price a customer is ready to sell his product. If the sell order price is not greater than a buy order price for a particular product we are performing a transaction.

Each order is valid for 10 seconds. After that time the stock-service application will not handle such an order since it is considered as expired. Each order contains a number of products for a transaction. For example, we may sell 100 for 10 or buy 200 for 11. Therefore, an order may be fully or partially realized. The stock-service application tries to join partially realized orders to other new or partially realized orders. You can see the visualization of that process in the picture below.

quarkus-kafka-streams-app

Run Apache Kafka locally

Before we jump to the implementation, we need to run a local instance of Apache Kafka. If you don’t want to install it on your laptop, the best way to run it is with Redpanda. Redpanda is a Kafka API compatible streaming platform. In comparison to Kafka, it is relatively easy to run it locally. Normally, you would have to install Redpanda on your laptop and then create a cluster using their CLI. But with Quarkus you don’t need to do that! The only requirement is to have Docker installed. Thanks to the Quarkus Kafka extension and feature called Dev Services it automatically starts a Kafka broker in dev mode and when running tests. Moreover, the application is configured automatically.

The only thing you need to do in order to enable that feature is NOT to provide any Kafka address in configuration properties. Dev Services uses Testcontainers to run Kafka, so if you have Docker or any other environment supporting Testcontainers running you get a containerized instance of Kafka out-of-the-box. Another important thing. Firstly, start the order-service application. It automatically creates all the required topics in Kafka. Then run the stock-service application. It uses the Quarkus Kafka Streams extension and verifies if the required topics exist. Let’s visualize it.

quarkus-kafka-streams-run

Send events to Kafka with Quarkus

There are several ways to send events to Kafka with Quarkus. Because we need to send key/value pair we will use the io.smallrye.reactive.messaging.kafka.Record object for that. Quarkus is able to generate and send data continuously. In the fragment of code visible below, we send a single Order event per 500 ms. Each Order contains a random productId, price and productCount.

@Outgoing("orders-buy")
public Multi<Record<Long, Order>> buyOrdersGenerator() {
   return Multi.createFrom().ticks().every(Duration.ofMillis(500))
      .map(order -> {
         Integer productId = random.nextInt(10) + 1;
         int price = prices.get(productId) + random.nextInt(200);
         Order o = new Order(
             incrementOrderId(),
             random.nextInt(1000) + 1,
             productId,
             100 * (random.nextInt(5) + 1),
             LocalDateTime.now(),
             OrderType.BUY,
             price);
         log.infof("Sent: %s", o);
         return Record.of(o.getId(), o);
   });
}

@Outgoing("orders-sell")
public Multi<Record<Long, Order>> sellOrdersGenerator() {
   return Multi.createFrom().ticks().every(Duration.ofMillis(500))
      .map(order -> {
         Integer productId = random.nextInt(10) + 1;
         int price = prices.get(productId) + random.nextInt(200);
         Order o = new Order(
             incrementOrderId(),
             random.nextInt(1000) + 1,
             productId,
             100 * (random.nextInt(5) + 1),
             LocalDateTime.now(),
             OrderType.SELL,
             price);
         log.infof("Sent: %s", o);
         return Record.of(o.getId(), o);
   });
}

We will also define a single @Incoming channel in order to receive transactions produced by the stock-service. Thanks to that Quarkus will automatically create the topic transactions used by Quarkus Kafka Streams in stock-service. To be honest, I was not able to force the Quarkus Kafka Streams extension to create the topic automatically. It seems we need to use the SmallRye Reactive Messaging extension for that.

@Incoming("transactions")
public void transactions(Transaction transaction) {
   log.infof("New: %s", transaction);
}

Of course, we need to include the SmallRye Reactive Messaging dependency to the Maven pom.xml.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

Finally, let’s provide configuration settings. We have two outgoing topics and a single incoming topic. We can set their names. Otherwise, Quarkus uses the same name as the name of the channel. The names of our topics are orders.buy, order.sell and transactions.

mp.messaging.outgoing.orders-buy.connector = smallrye-kafka
mp.messaging.outgoing.orders-buy.topic = orders.buy
mp.messaging.outgoing.orders-buy.key.serializer = org.apache.kafka.common.serialization.LongSerializer
mp.messaging.outgoing.orders-buy.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

mp.messaging.outgoing.orders-sell.connector = smallrye-kafka
mp.messaging.outgoing.orders-sell.topic = orders.sell
mp.messaging.outgoing.orders-sell.key.serializer = org.apache.kafka.common.serialization.LongSerializer
mp.messaging.outgoing.orders-sell.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

mp.messaging.incoming.transactions.connector = smallrye-kafka
mp.messaging.incoming.transactions.topic = transactions
mp.messaging.incoming.transactions.value.deserializer = pl.piomin.samples.streams.order.model.deserializer.TransactionDeserializer

That’s all. Our orders generator is ready. If you the order-service application Quarkus will also run Kafka (Redpanda) instance. But first, let’s switch to the second sample application – stock-service.

Consume Kafka Streams with Quarkus

In the previous section, we were sending messages to the Kafka broker. Therefore, we used a standard Quarkus library for integration with Kafka based on the SmallRye Reactive Messaging framework. The stock-service application consumes messages as streams, so now we will use a module for Kafka Streams integration.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-kafka-streams</artifactId>
</dependency>

Our application also uses a database, an ORM layer and includes some other useful modules.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-jdbc-h2</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-health</artifactId>
</dependency>

In the first step, we are going to merge both streams of orders (buy and sell), insert the Order into the database, and print the event message. You could ask – why I use the database and ORM layer here since I have Kafka KTable? Well, I need transactions with lock support in order to coordinate the status of order realization (refer to the description in the introduction – fully and partially realized orders). I will give you more details about it in the next sections.

In order to process streams with Quarkus, we need to declare the org.apache.kafka.streams.Topology bean. It contains all the KStream and KTable definitions. Let’s start just with the part responsible for creating and emitting transactions from incoming orders. There are two KStream definitions created. The first of them is responsible for merging two order streams into a single one and then inserting a new Order into a database. The second of them creates and executes transactions by joining two streams using the productId key. But more about it in the next section.

@Produces
public Topology buildTopology() {
   ObjectMapperSerde<Order> orderSerde = 
      new ObjectMapperSerde<>(Order.class);
   ObjectMapperSerde<Transaction> transactionSerde = 
      new ObjectMapperSerde<>(Transaction.class);

   StreamsBuilder builder = new StreamsBuilder();

   KStream<Long, Order> orders = builder.stream(
      ORDERS_SELL_TOPIC,
      Consumed.with(Serdes.Long(), orderSerde));

   builder.stream(ORDERS_BUY_TOPIC, 
         Consumed.with(Serdes.Long(), orderSerde))
      .merge(orders)
      .peek((k, v) -> {
         log.infof("New: %s", v);
         logic.add(v);
      });

   builder.stream(ORDERS_BUY_TOPIC, 
         Consumed.with(Serdes.Long(), orderSerde))
      .selectKey((k, v) -> v.getProductId())
      .join(orders.selectKey((k, v) -> v.getProductId()),
         this::execute,
         JoinWindows.of(Duration.ofSeconds(10)),
         StreamJoined.with(Serdes.Integer(), orderSerde, orderSerde))
      .filterNot((k, v) -> v == null)
      .map((k, v) -> new KeyValue<>(v.getId(), v))
      .peek((k, v) -> log.infof("Done -> %s", v))
      .to(TRANSACTIONS_TOPIC, Produced.with(Serdes.Long(), transactionSerde));

}

To process the streams we need to add configuration properties. A list of input topics is required. We can also override a default application id and enable Kafka health check.

quarkus.kafka-streams.application-id = stock
quarkus.kafka-streams.topics = orders.buy,orders.sell
quarkus.kafka.health.enabled = true

Operations on Kafka Streams

Now, we may use some more advanced operations on Kafka Streams than just merging two different streams. In fact, that’s a key logic in our application. We need to join two different order streams into a single one using the productId as a joining key. Since the producer sets orderId as a message key, we first need to invoke the selectKey method for both order.sell and orders.buy streams. In our case, joining buy and sell orders related to the same product is just a first step. Then we need to verify if the maximum price in the buy order is not greater than the minimum price in the sell order.

The next step is to verify if both these have not been realized previously, as they also may be paired with other orders in the stream. If all the conditions are met we may create a new transaction. Finally, we may change a stream key from productId to the transactionId and send it to the dedicated transactions topic.

Each time we successfully join two orders we are trying to create a transaction. The execute(...) method is called within the KStream join method. Firstly, we are comparing the prices of both orders. Then we verify the realization status of both orders by accessing the H2 database. If the orders are still not fully realized we may create a transaction and update orders records in the database.

private Transaction execute(Order orderBuy, Order orderSell) {
   if (orderBuy.getAmount() >= orderSell.getAmount()) {
      int count = Math.min(orderBuy.getProductCount(), 
                           orderSell.getProductCount());
      boolean allowed = logic
         .performUpdate(orderBuy.getId(), orderSell.getId(), count);
      if (!allowed)
         return null;
      else
         return new Transaction(
            ++transactionId,
            orderBuy.getId(),
            orderSell.getId(),
            count,
            (orderBuy.getAmount() + orderSell.getAmount()) / 2,
            LocalDateTime.now(),
            "NEW"
      );
   } else {
            return null;
   }
}

Let’s take a closer look at the performUpdate() method called inside the execute() method. It initiates a transaction and locks both Order entities. Then it verifies each order realization status and updates it with the current values if possible. Only if the performUpdate() method finishes successfully the stock-service application creates a new transaction.

@ApplicationScoped
public class OrderLogic {

    @Inject
    Logger log;
    @Inject
    OrderRepository repository;

    @Transactional
    public Order add(Order order) {
        repository.persist(order);
        return order;
    }

    @Transactional
    public boolean performUpdate(Long buyOrderId, Long sellOrderId, int amount) {
        Order buyOrder = repository.findById(buyOrderId, 
           LockModeType.PESSIMISTIC_WRITE);
        Order sellOrder = repository.findById(sellOrderId, 
           LockModeType.PESSIMISTIC_WRITE);
        if (buyOrder == null || sellOrder == null)
            return false;
        int buyAvailableCount = 
           buyOrder.getProductCount() - buyOrder.getRealizedCount();
        int sellAvailableCount = 
           sellOrder.getProductCount() - sellOrder.getRealizedCount();
        if (buyAvailableCount >= amount && sellAvailableCount >= amount) {
            buyOrder.setRealizedCount(buyOrder.getRealizedCount() + amount);
            sellOrder.setRealizedCount(sellOrder.getRealizedCount() + amount);
            repository.persist(buyOrder);
            repository.persist(sellOrder);
            return true;
        } else {
            return false;
        }
    }
}

Nice 🙂 That’s all that we need to do in the first part of our exercise. Now we can run both our sample applications.

Run and manage Kafka Streams application with Quarkus

As I mentioned before, we first need to start the order-service. It runs a new Kafka instance and creates all required topics. Immediately after startup, it is ready to send new orders. To run the Quarkus app locally just go to the order-service directory and execute the following command:

$ mvn quarkus:dev

Just to verify you can display a list running Docker containers with the docker ps command. Here’s my result:

As you see the instance of Redpanda is running and it is available on a random port 49724. Quarkus did it for us. However, if you have Redpanda installed on your laptop you check out the list of created topics with their CLI rpk:

$ rpk topic list --brokers=127.0.0.1:49724

Then let’s run the stock-service. Go to the stock-service directory and run mvn quarkus:dev once again. After startup, it just works. Both applications share the same instance thanks to the Quarkus Dev Services. Now let’s access the Quarkus Dev UI console available at http://localhost:8080/q/dev/. Find the tile with the “Apache Kafka Streams” title.

You can check a visualization of our Kafka Streams topology. I will divide the image into two parts for better visibility.

Use Kafka KTable with Quarkus

We have already finished the implementation of the logic responsible for creating transactions from incoming orders. In the next step, we are going to perform analytical operations on the transactions stream. Our main goal is to calculate total number of transactions, total number of products sold/bought, and total value of transactions (price * productsCount) per each product. Here’s the object class used in calculations.

@RegisterForReflection
public class TransactionTotal {
   private int count;
   private int amount;
   private int productCount;

   // GETTERS AND SETTERS
}

Because the Transaction object does not contain information about the product, we first need to join the order to access it. Then we produce a KTable by per productId grouping and aggregation. After that, we may invoke an aggregate method that allows us to perform some more complex calculations. In that particular case, we are calculating the number of all executed transactions, their volume of products, and total value. The result KTable can be materialized as the state store. Thanks to that we will be able to query it by the name defined by the TRANSACTIONS_PER_PRODUCT_SUMMARY variable.

KeyValueBytesStoreSupplier storePerProductSupplier = Stores.persistentKeyValueStore(
   TRANSACTIONS_PER_PRODUCT_SUMMARY);

builder.stream(TRANSACTIONS_TOPIC, Consumed.with(Serdes.Long(), transactionSerde))
   .selectKey((k, v) -> v.getSellOrderId())
   .join(orders.selectKey((k, v) -> v.getId()),
      (t, o) -> new TransactionWithProduct(t, o.getProductId()),
      JoinWindows.of(Duration.ofSeconds(10)),
      StreamJoined.with(Serdes.Long(), transactionSerde, orderSerde))
   .groupBy((k, v) -> v.getProductId(), Grouped.with(Serdes.Integer(), transactionWithProductSerde))
   .aggregate(
      TransactionTotal::new,
      (k, v, a) -> {
         a.setCount(a.getCount() + 1);
         a.setProductCount(a.getAmount() + v.getTransaction().getAmount());
         a.setAmount(a.getProductCount() +
            (v.getTransaction().getAmount() * v.getTransaction().getPrice()));
         return a;
      },
      Materialized.<Integer, TransactionTotal> as(storePerProductSupplier)
         .withKeySerde(Serdes.Integer())
         .withValueSerde(transactionTotalSerde))
   .toStream()
   .peek((k, v) -> log.infof("Total per product(%d): %s", k, v))
   .to(TRANSACTIONS_PER_PRODUCT_AGGREGATED_TOPIC, 
      Produced.with(Serdes.Integer(), transactionTotalSerde));

Here’s the class responsible for interactive queries implementation. It injects KafkaStreams bean. Then it tries to obtain persistent store basing on the StockService.TRANSACTIONS_PER_PRODUCT_SUMMARY variable. As a result there is a ReadOnlyKeyValueStore with Integer as a key, and TransactionTotal as a value. We may return a single value related with the particular productId (getTransactionsPerProductData) or just return a list with results for all available products (getAllTransactionsPerProductData).

@ApplicationScoped
public class InteractiveQueries {

   @Inject
   KafkaStreams streams;

   public TransactionTotal getTransactionsPerProductData(Integer productId) {
      return getTransactionsPerProductStore().get(productId);
   }

   public Map<Integer, TransactionTotal> getAllTransactionsPerProductData() {
      Map<Integer, TransactionTotal> m = new HashMap<>();
      KeyValueIterator<Integer, TransactionTotal> it = getTransactionsPerProductStore().all();
      while (it.hasNext()) {
         KeyValue<Integer, TransactionTotal> kv = it.next();
         m.put(kv.key, kv.value);
      }
      return m;
   }

   private ReadOnlyKeyValueStore<Integer, TransactionTotal> getTransactionsPerProductStore() {
      return streams.store(
         StoreQueryParameters
            .fromNameAndType(StockService.TRANSACTIONS_PER_PRODUCT_SUMMARY, QueryableStoreTypes.keyValueStore()));
   }

}

Finally, we can create a REST controller responsible for exposing data retrieved by the interactive queries.

@ApplicationScoped
@Path("/transactions")
public class TransactionResource {

    @Inject
    InteractiveQueries interactiveQueries;

    @GET
    @Path("/products/{id}")
    public TransactionTotal getByProductId(@PathParam("id") Integer productId) {
        return interactiveQueries.getTransactionsPerProductData(productId);
    }

    @GET
    @Path("/products")
    public Map<Integer, TransactionTotal> getAllPerProductId() {
        return interactiveQueries.getAllTransactionsPerProductData();
    }

}

Now you can easily check out statistics related to the transactions created by the stock-service. You just need to call the following REST endpoints e.g.:

$ curl http://localhost:8080/transactions/products
$ curl http://localhost:8080/transactions/products/3
$ curl http://localhost:8080/transactions/products/5

Final Thoughts

Quarkus simplifies working with Kafka Streams and interactive queries. It provides useful improvements for developers like auto-start of Kafka in dev and test modes or Kafka streams visualization in dev UI console. You can easily compare the Quarkus approach with the Spring Cloud Stream Kafka support since I implemented the same logic for both those frameworks. Here’s the GitHub repository with Spring Cloud Stream Kafka Streams example.

The post Kafka Streams with Quarkus appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/11/24/kafka-streams-with-quarkus/feed/ 4 10234
Knative Eventing with Quarkus, Kafka and Camel https://piotrminkowski.com/2021/06/14/knative-eventing-with-quarkus-kafka-and-camel/ https://piotrminkowski.com/2021/06/14/knative-eventing-with-quarkus-kafka-and-camel/#comments Mon, 14 Jun 2021 07:47:54 +0000 https://piotrminkowski.com/?p=9797 In this article, you will learn how to use Quarkus with Camel to create applications that send messages to Kafka and receive CloudEvent from Knative Eventing. We will build a very similar system to the system described in my previous article Knative Eventing with Kafka and Quarkus. However, this time we will use Apache Camel […]

The post Knative Eventing with Quarkus, Kafka and Camel appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Quarkus with Camel to create applications that send messages to Kafka and receive CloudEvent from Knative Eventing. We will build a very similar system to the system described in my previous article Knative Eventing with Kafka and Quarkus. However, this time we will use Apache Camel instead of several Quarkus extensions including Kafka support.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

First, you should go to the saga directory. It contains two applications built on top of Quarkus and Apache Camel. Today we will implement an eventual consistency pattern (also known as a SAGA pattern). It will use the Knative Eventing model for exchanging events between our applications.

1. Prerequisites

Before we start, we need to configure some components like Kafka, Knative or Kafka Eventing Broker. Let’s go further these steps.

1.1. Install Apache Kafka cluster

Firstly, let’s create our kafka namespace.

$ kubectl create namespace kafka

Then we apply the installation files, including ClusterRolesClusterRoleBindings and other required Kubernetes CustomResourceDefinitions (CRD).

$ kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

After that, we can create a single-node persistent Apache Kafka cluster. We use an example custom resource for the Strimzi operator.

$ kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka

Finally, we may verify our installation. You should the same result as shown below.

1.2. Install Knative Serving and Eventing

You can install Knative on your Kubernetes cluster using YAML manifests or operator. The current version of Knative is 0.23. This is minimal list of steps you need to do with YAML-based installation is visible below. For more details, you may refer to the documentation. I place or the required commands to simplify a process for you. Let’s install Knative Serving.

$ kubectl apply -f https://github.com/knative/serving/releases/download/v0.23.0/serving-crds.yaml
$ kubectl apply -f https://github.com/knative/serving/releases/download/v0.23.0/serving-core.yaml
$ kubectl apply -f https://github.com/knative/net-kourier/releases/download/v0.23.0/kourier.yaml
$ kubectl patch configmap/config-network \
  --namespace knative-serving \
  --type merge \
  --patch '{"data":{"ingress.class":"kourier.ingress.networking.knative.dev"}}'

Then let’s install Knative Eventing.

$ kubectl apply -f https://github.com/knative/eventing/releases/download/v0.23.0/eventing-crds.yaml
$ kubectl apply -f https://github.com/knative/eventing/releases/download/v0.23.0/eventing-core.yaml

1.3. Install Knative Kafka Eventing

The following commands install the Apache Kafka broker, and run event routing in a system namespace, knative-eventing, by default.

$ kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-controller.yaml
$ kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-broker.yaml

Then, we should install CRD with KafkaBinding and KafkaSource.

$ kubectl apply -f https://storage.googleapis.com/knative-releases/eventing-contrib/latest/kafka-source.yaml

Finally, let’s just create a broker.

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: default
  namespace: default
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing

1.4. Install Apache Camel K operator (optional)

If you would like to use Apache Camel K to run the Quarkus application on Knative you first install its operator. After downloading Camel K CLI you just need to run the following command.

$ kamel install

2. Run the application with Apache Camel K

In order to deploy and run an application on Knative, we may execute the kamel run command. Some parameters might be set in the source file. In the command visible below, I’m setting the name Knative service, source file location, and Quarkus properties.

$ kamel run --name order-saga --dev OrderRoute.java \
    -p quarkus.datasource.db-kind=h2 \
    -p quarkus.datasource.jdbc.url=jdbc:h2:mem:testdb \
    -p quarkus.hibernate-orm.packages=com.github.piomin.entity.model.order

For more details about deploying Quarkus with Camel K on Kubernetes you may refer to my article Apache Camel K and Quarkus on Kubernetes.

Finally, I was not able to deploy exactly this application on Knative with Camel K. That’s because it didn’t see JPA entities included in the application with the external library. However, the application is also prepared for deploying with Camel K. The whole source code is a single Java file and there are some Camel K modeline hooks in this source code.

3. Integrate Quarkus with Apache Camel

We can easily integrate Apache Camel routes with Quarkus. Camel Quarkus provides extensions for many of the Camel components. We need to include those components in our Maven pom.xml. What type of components do we need? The full list is visible below. However, first, let’s discuss them a little bit more.

Our application uses JPA to store entities in the H2 database. So we will include the Camel Quarkus JPA extension to provide the JPA implementation with Hibernate. For a single persistence unit, this extension automatically creates EntityManagerFactory and TransactionManager. In order to integrate with Apache Kafka, we need to include the Camel Quarkus Kafka extension. In order to receive events from Knative, we should expose the HTTP POST endpoint. That’s why we need several extensions like Platform HTTP or Jackson. Here’s my list of Maven dependencies.

<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-core</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-platform-http</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-bean</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-timer</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-jpa</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-rest</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-jackson</artifactId>
</dependency>

Then, we just need to create a class that extends RouteBuilder. The routes have to be defined inside the configure() method. Before we get into the details let’s analyze our domain model classes.

public class CustomerRoute extends RouteBuilder {
   @Override
   public void configure() throws Exception { 
      ...
   }
}

4. Domain model for Quarkus JPA and Kafka

I created a separate project for entity model classes. The repository is available on GitHub https://github.com/piomin/entity-model.git. Thanks to that, I have a typical serverless application with consists of a single class. It is also possible to easily deploy it on Knative with Camel K. Here’s a model entity for order-saga.

@Entity
@Table(name = "orders")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Order implements Serializable {
    @Id
    @GeneratedValue
    private Long id;
    private Integer customerId;
    private Integer productId;
    private int amount;
    private int productCount;
    @Enumerated
    private OrderStatus status = OrderStatus.NEW;
}

Just to simplify, I’m using the same class when sending events to Kafka. We can also take a look at a model entity for customer-saga.

@Entity
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Customer implements Serializable {
    @Id
    @GeneratedValue
    private Long id;
    private String name;
    private int amountAvailable;
    private int amountReserved;
}

5. Building Camel routes with Quarkus and Kafka extension

In the first step, we are going to generate orders and send them to the Kafka topic. Before that, we will store them in the H2 database using the Camel JPA extension. This part of logic is implemented inside order-saga.

from("timer:tick?period=10000")
   .setBody(exchange -> 
      new Order(null, r.nextInt(10) + 1, r.nextInt(10) + 1, 100, 1, OrderStatus.NEW))
   .to("jpa:" + Order.class.getName())
   .marshal().json(JsonLibrary.Jackson)
   .log("New Order: ${body}")
   .toD("kafka:order-events?brokers=${env.KAFKA_BOOTSTRAP_SERVERS}");

Some things need to be clarified here. Before sending a message to the Kafka topic we need to serialize it to the JSON format. The application does not anything about the Kafka address. This address has been injected into the container by the KafkaBinding object. It is available for the Camel route as the environment variable KAFKA_BOOTSTRAP_SERVERS.

Now, let’s switch to the customer-saga application. In order to receive an event for the Knative broker, we should expose an HTTP POST endpoint. This endpoint takes Order as an input. Then, if the order’s status equals NEW it performs a reservation on the customer account. Before that, it sends back a response to a reserver-events topic.

Also, let’s take a look at the fragment responsible for searching the customer in the database and performing an update. We use Quarkus Camel JPA extension. First, we need to define a JPQL query to retrieve an entity. Then, we update the Customer entity depending on the order status.

rest("/customers")
   .post("/reserve").consumes("application/json")
   .route()
      .log("Order received: ${body}")
      .unmarshal().json(JsonLibrary.Jackson, Order.class)
      .choice()
         .when().simple("${body.status.toString()} == 'NEW'")
            .setBody(exchange -> {
               Order order = exchange.getIn().getBody(Order.class);
               order.setStatus(OrderStatus.IN_PROGRESS);
               return order;
            })
            .marshal().json(JsonLibrary.Jackson)
            .log("Reservation sent: ${body}")
            .toD("kafka:reserve-events?brokers=${env.KAFKA_BOOTSTRAP_SERVERS}")
      .end()
      .unmarshal().json(JsonLibrary.Jackson, Order.class)
      .setProperty("orderAmount", simple("${body.amount}", Integer.class))
      .setProperty("orderStatus", simple("${body.status}", OrderStatus.class))
      .toD("jpa:" + Customer.class.getName() + 
         "?query=select c from Customer c where c.id= ${body.customerId}")
      .choice()
         .when().simple("${exchangeProperty.orderStatus} == 'IN_PROGRESS'")
            .setBody(exchange -> {
               Customer customer = (Customer) exchange.getIn().getBody(List.class).get(0);
               customer.setAmountReserved(customer.getAmountReserved() + 
                  exchange.getProperty("orderAmount", Integer.class));
               customer.setAmountAvailable(customer.getAmountAvailable() - 
                  exchange.getProperty("orderAmount", Integer.class));
               return customer;
            })
            .otherwise()
               .setBody(exchange -> {
                  Customer customer = (Customer) exchange.getIn().getBody(List.class).get(0);
                  customer.setAmountReserved(customer.getAmountReserved() - 
                     exchange.getProperty("orderAmount", Integer.class));
                  return customer;
               })
      .end()
      .log("Current customer: ${body}")
      .to("jpa:" + Customer.class.getName() + "?useExecuteUpdate=true")
      .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(201)).setBody(constant(null))
.endRest();

We can also generate some test data in customer-saga using Camel route. It runs once just after the Quarkus application startup.

from("timer://runOnce?repeatCount=1&delay=100")
   .loop(10)
      .setBody(exchange -> new Customer(null, "Test"+(++i), r.nextInt(50000), 0))
      .to("jpa:" + Customer.class.getName())
      .log("Add: ${body}")
   .end();

6. Configure Knative Eventing with Kafka broker

6.1. Architecture with Quarkus, Camel and Kafka

We have already created two applications built on top of Quarkus and Apache Camel. Both of these applications expose HTTP POST endpoints and send events to the Kafka topics. Now, we need to create some Kubernetes objects to orchestrate the process. So far, we just send the events to topics, they have not been routed to the target applications. Let’s take a look at the architecture of our system. There are two topics on Kafka that receive events: order-events and reserve-events. The messages from those topics are not automatically sent to the Knative broker. So, first, we need to create the KafkaSource object to get messages from these topics and send them to the broker.

quarkus-camel-kafka-arch

6.2. Configure Knative eventing

The KafkaSource object takes the list of input topics and a target application. In that case, the target application is the Knative broker. We may create a single KafkaSource for both topics or two sources per each topic. Here’s the KafkaSource definition that takes messages from the reserve-events topic.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-reserve-order
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - reserve-events
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default

Let’s create both sources. After creating them, we may verify if everything went well by executing the command kubectl get sources.

quarkus-camel-kafka-sources

Before running our application on Knative we should create KafkaBinding objects. This object is responsible for injecting the address of the Kafka cluster into the application containers. The address of a broker will be available for the application under the KAFKA_BOOTSTRAP_SERVERS environment variable.

apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
  name: kafka-binding-customer-saga
spec:
  subject:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: customer-saga
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092

Let’s create both KafkaBinding objects. Here’s the list of available bindings after running the kubectl get bindings command.

Finally, we may proceed to the last step of our configuration. We will create triggers. A trigger represents a desire to subscribe to events from a specific broker. Moreover, we may apply a simple filtering mechanism using the Trigger object. For example, if we want to send only the events from the order-events topic and with the type dev.knative.kafka.event we should create a definition as shown below.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: customer-saga-trigger
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/kafka-source-orders-customer#order-events
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /customers/reserve

Similarly, we should create a trigger that sends messages to the order-saga POST endpoint. It gets messages from the reserve-events source and sends them to the /orders/confirm endpoint.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: order-saga-trigger
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/kafka-source-reserve-order#reserve-events
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: order-saga
    uri: /orders/confirm

Finally, we can display a list of active triggers by executing the command kubectl get trigger.

quarkus-camel-kafka-triggers

7. Deploy Quarkus application on Knative

Once we finished the development of our sample applications we may deploy them on Knative. One of the possible deployment options is with Apache Camel K. In case of any problems with this type of deployment we may also use the Quarkus Kubernetes module. Firstly, let’s include two required modules. We will also leverage the Jib Maven plugin.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-kubernetes</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-container-image-jib</artifactId>
</dependency>

The rest of the configuration should be provided inside the application properties file. In the first step, we need to enable automatic deployment on Kubernetes by setting the property quarkus.kubernetes.deploy to true. By default, Quarkus creates a standard Kubernetes Deployment. Therefore, we should set the quarkus.kubernetes.deployment-target to knative. In that case, it will generate a Knative Service YAML. Finally, we have to change the name of the image group to dev.local. Of course, it is required just if we run our applications on the local Kubernetes cluster like me.

quarkus.kubernetes.deploy = true
quarkus.kubernetes.deployment-target = knative
quarkus.container-image.group = dev.local

Now, if run the build with the mvn clean package command, our application will be automatically deployed on Knative. After that, let’s verify the list of Knative services.

Once, the order-saga application is started, it generates one order per 10 seconds and then sends it to the Kafka topic order-events. We can easily verify that it works properly, by checking out a list of active topics as shown below.

We can also verify a list of Knative events exchanged by the applications.

$ kubectl get eventtype

Final Thoughts

Quarkus and Apache Camel seem to be a perfect combination when creating serverless applications on Knative. We can easily implement the whole logic within a single source code file. We can also use Camel K to deploy our applications on Kubernetes or Knative. You can compare the approach described in this article with the one based on Quarkus and its extensions to Kafka or Knative available in this repository.

The post Knative Eventing with Quarkus, Kafka and Camel appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/06/14/knative-eventing-with-quarkus-kafka-and-camel/feed/ 4 9797
Knative Eventing with Kafka and Quarkus https://piotrminkowski.com/2021/03/31/knative-eventing-with-kafka-and-quarkus/ https://piotrminkowski.com/2021/03/31/knative-eventing-with-kafka-and-quarkus/#respond Wed, 31 Mar 2021 12:55:26 +0000 https://piotrminkowski.com/?p=9620 In this article, you will learn how to run eventing applications on Knative using Kafka and Quarkus. Previously I described the same approach for Kafka and Spring Cloud. If you want to compare both of them read my article Knative Eventing with Kafka and Spring Cloud. We will deploy exactly the same architecture. However, instead […]

The post Knative Eventing with Kafka and Quarkus appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to run eventing applications on Knative using Kafka and Quarkus. Previously I described the same approach for Kafka and Spring Cloud. If you want to compare both of them read my article Knative Eventing with Kafka and Spring Cloud. We will deploy exactly the same architecture. However, instead of Spring Cloud Functions we will use Quarkus Funqy. Also, Spring Cloud Stream may be replaced with Quarkus Kafka. Before we start, let’s clarify some things.

Concept over Knative and Quarkus

Quarkus supports Knative in several ways. First of all, we may use the Quarkus Kubernetes module to simplify deployment on Knative. We can also use the Quarkus Funqy Knative Event extension to route and process cloud events within functions. That’s not all. Quarkus supports a serverless functional style. With the Quarkus Funqy module, we can write functions deployable to various FaaS (including Knative). These functions can be invoked through HTTP. Finally, we may integrate our application with Kafka topics using annotations from the Quarkus Kafka extension.

The Quarkus Funqy Knative Event module bases on the Knative broker and triggers. Since we will use Kafka Source instead of broker and trigger we won’t include that module. However, we can still take advantage of Quarkus Funqy and HTTP binding.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

As I mentioned before, we will the same architecture and scenario as in my previous article about Knative eventing. Let’s briefly describe it.

Today we will implement an eventual consistency pattern (also known as a SAGA pattern). How it works? The sample system consists of three services. The order-service creates a new order that is related to the customers and products. That order is sent to the Kafka topic. Then, our two other applications customer-service and product-service receive the order event. After that, they perform a reservation. The customer-service reserves an order’s amount on the customer’s account. Meanwhile the product-service reserves a number of products specified in the order. Both these services send a response to the order-service through the Kafka topic. If the order-service receives positive reservations from both services it confirms the order. Then, it sends an event with that information. Both customer-service and product-service receive the event and confirm reservations. You can verify it in the picture below.

quarkus-knative-eventing-arch

Prerequisites

There are several requirements we need to comply before start. I described them all in my previous article about Knative Eventing. Here’s just a brief remind:

  1. Kubernetes cluster with at least 1.17 version. I’m using a local cluster. If you use a remote cluster replace dev.local in image name into your Docker account name
  2. Install Knative Serving and Eventing on your cluster. You may find the detailed installation instructions here.
  3. Install Kafka Eventing Broker. Here’s the link to the releases site. You don’t need everything – we will use the KafkaSource and KafkaBinding CRDs
  4. Install Kafka cluster with the Strimzi operator. I installed it in the kafka namespace. The name of my cluster is my-cluster.

Step 1. Installing Kafka Knative components

Assuming you have already installed all the required elements to run Knative Eventing on your Kubernetes cluster, we may create some components dedicated to applications. You may find YAML manifests with object declarations in the k8s directory inside every single application directory. Firstly, let’s create a KafkaBinding. It is responsible for injecting the address of the Kafka cluster into the application container. Thanks to KafkaBinding that address is visible inside the container as the KAFKA_BOOTSTRAP_SERVERS environment variable. Here’s an example of the YAML declaration for the customer-saga application. We should create similar objects for two other applications.

apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
  name: kafka-binding-customer-saga
spec:
  subject:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: customer-saga
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092

In the next step, we create the KafkaSource object. It reads events from the particular topic and passes them to the consumer. It calls the HTTP POST endpoint exposed by the application. We can override a default context path of the HTTP endpoint. For the customer-saga the target URL is /reserve. It should receive events from the order-events topic. Because both customer-saga and product-saga listen for events from the order-events topic we need to create a similar KafkaSource object also for product-saga.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-orders-customer
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - order-events
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /reserve

On the other hand, the order-saga listens for events on the reserve-events topic. If you want to verify our scenario once again please refer to the diagram in the Source Code section. This time the target URL is /confirm.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-orders-confirm
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - reserve-events
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: order-saga
    uri: /confirm

Let’s verify a list of Kafka sources. In our case there is a single KafkaSource per application. Before deploying our Quarkus application on Knative your Kafka source won’t be ready.

quarkus-knative-eventing-kafkasource

Step 2. Integrating Quarkus with Kafka

In order to integrate Quarkus with Apache Kafka, we may use the SmallRye Reactive Messaging library. Thanks to that we may define an input and output topic for each method using annotations. The messages are serialized to JSON. We can also automatically expose Kafka connection status in health check. Here’s the list of dependencies we need to include in Maven pom.xml.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-jackson</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-health</artifactId>
</dependency>

Before we start with the source code, we need to provide some configuration settings in the application.properties file. Of course, Kafka requires a connection URL to the cluster. We use the environment variable injected by the KafkaBinding object. Also, the output topic name should be configured. Here’s a list of required properties for the order-saga application.

kafka.bootstrap.servers = ${KAFKA_BOOTSTRAP_SERVERS}

mp.messaging.outgoing.order-events.connector = smallrye-kafka
mp.messaging.outgoing.order-events.topic = order-events
mp.messaging.outgoing.order-events.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Finally, we may switch to the code. Let’s start with order-saga. It will continuously send orders to the order-events topic. Those events are received by both customer-saga and product-saga applications. The method responsible for generating and sending events returns reactive stream using Mutiny Multi. It sends an event every second. We need to annotate the method with the @Outgoing annotation passing the name of output defined in application properties. Also, @Broadcast annotation Indicates that the event is dispatched to all subscribers. Before sending, every order needs to be persisted in a database (we use H2 in-memory database).

@ApplicationScoped
@Slf4j
public class OrderPublisher {

   private static int num = 0;

   @Inject
   private OrderRepository repository;
   @Inject
   private UserTransaction transaction;

   @Outgoing("order-events")
   @Broadcast
   public Multi<Order> orderEventsPublish() {
      return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
           .map(tick -> {
              Order o = new Order(++num, num%10+1, num%10+1, 100, 1, OrderStatus.NEW);
              try {
                 transaction.begin();
                 repository.persist(o);
                 transaction.commit();
              } catch (Exception e) {
                 log.error("Error in transaction", e);
              }

              log.info("Order published: {}", o);
              return o;
           });
   }

}

Step 3. Handling Knative events with Quarkus Funqy

Ok, in the previous step we have already implemented a part of the code responsible for sending events to the Kafka topic. We also have KafkaSource that is responsible for dispatching events from the Kafka topic into the application HTTP endpoint. Now, we just need to handle them. It is very simple with Quarkus Funqy. It allows us to create functions according to the serverless Faas approach. But we can also easily bound each function to the HTTP endpoint with the Quarkus Funqy HTTP extension. Let’s include it in our dependencies.

 <dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-funqy-http</artifactId>
 </dependency>

In order to create a function with Quarkus Funqy, we just need to annotate the particular method with @Funq. The name of the method is reserve, so it is automatically bound to the HTTP endpoint POST /reserve. It takes a single input parameter, which represents incoming order. It is automatically deserialized from JSON.

In the fragment of code visible below, we implement order handling in the customer-saga application. Once it receives an order, it performs a reservation on the customer account. Then it needs to send a response to the order-saga. To do that we may use Quarkus reactive messaging support once again. We define the Emitter object that allows us to send a single event into the topic. We may use inside a method that does not return any output that should be sent to a topic (with @Outgoing). The Emitter bean should be annotated with @Channel. It works similar to @Outgoing. We also need to define an output topic related to the name of the channel.

@Slf4j
public class OrderReserveFunction {

   @Inject
   private CustomerRepository repository;
   @Inject
   @Channel("reserve-events")
   Emitter<Order> orderEmitter;

   @Funq
   public void reserve(Order order) {
      log.info("Received order: {}", order);
      doReserve(order);
   }

   private void doReserve(Order order) {
      Customer customer = repository.findById(order.getCustomerId());
      log.info("Customer: {}", customer);
      if (order.getStatus() == OrderStatus.NEW) {
         customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
         customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
         order.setStatus(OrderStatus.IN_PROGRESS);
         log.info("Order reserved: {}", order);
         orderEmitter.send(order);
      } else if (order.getStatus() == OrderStatus.CONFIRMED) {
         customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
      }
      repository.persist(customer);
   }
}

Here are configuration properties for integration between Kafka and Emitter. The same configuration properties should be created for both customer-saga and product-saga.

kafka.bootstrap.servers = ${KAFKA_BOOTSTRAP_SERVERS}

mp.messaging.outgoing.reserve-events.connector = smallrye-kafka
mp.messaging.outgoing.reserve-events.topic = reserve-events
mp.messaging.outgoing.reserve-events.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Finally, let’s take a look at the implementation of the Quarkus function inside the product-saga application. It also sends a response to the reserve-events topic using the Emitter object. It handles incoming orders and performs a reservation for the requested number of products.

@Slf4j
public class OrderReserveFunction {

   @Inject
   private ProductRepository repository;

   @Inject
   @Channel("reserve-events")
   Emitter<Order> orderEmitter;

   @Funq
   public void reserve(Order order) {
      log.info("Received order: {}", order);
      doReserve(order);
   }

   private void doReserve(Order order) {
      Product product = repository.findById(order.getProductId());
      log.info("Product: {}", product);
      if (order.getStatus() == OrderStatus.NEW) {
         product.setReservedItems(product.getReservedItems() + order.getProductsCount());
         product.setAvailableItems(product.getAvailableItems() - order.getProductsCount());
         order.setStatus(OrderStatus.IN_PROGRESS);
         orderEmitter.send(order);
      } else if (order.getStatus() == OrderStatus.CONFIRMED) {
         product.setReservedItems(product.getReservedItems() - order.getProductsCount());
      }
      repository.persist(product);
   }
}

Step 4. Deploy Quarkus application on Knative

Finally, we can deploy all our applications on Knative. To simplify that process we may use Quarkus Kubernetes support. It is able to automatically generate deployment manifests based on the source code and application properties. Quarkus also supports building images with Jib. So first, let’s add the following dependencies to Maven pom.xml.

 <dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-kubernetes</artifactId>
 </dependency>
 <dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-container-image-jib</artifactId>
 </dependency>

In the next step, we need to add some configuration settings to the application.properties file. To enable automatic deployment on Kubernetes the property quarkus.kubernetes.deploy must be set to true. Then we should change the target platform into Knative. Thanks to that Quarkus will generate Knative Service instead of a standard Kubernetes Deployment. The last property quarkus.container-image.group is responsible for setting the name of the image owner group. For local development with Knative, we should set the dev.local value there.

quarkus.kubernetes.deploy = true
quarkus.kubernetes.deployment-target = knative
quarkus.container-image.group = dev.local

After setting all the values visible above we just need to execute Maven build to deploy the application.

$ mvn clean package

After running Maven build for all the applications let’s verify a list of Knative Services.

Once the order-saga application starts it begins sending orders continuously. It also receives order events sent by customer-saga and product-saga. Those events are processed by the Quarkus function. Here are the logs printed by order-saga.

Final Thoughts

As you see, we can easily implement and deploy Quarkus applications on Knative. Quarkus provides several extensions that simplify integration with the Knative Eventing model and Kafka broker. We can use Quarkus Funqy to implement the serverless FaaS approach or SmallRye Reactive Messaging to integrate with Apache Kafka. You can compare that Quarkus support with Spring Boot in my previous article: Knative Eventing with Kafka and Spring Cloud.

The post Knative Eventing with Kafka and Quarkus appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/03/31/knative-eventing-with-kafka-and-quarkus/feed/ 0 9620