kafka Archives - Piotr's TechBlog https://piotrminkowski.com/tag/kafka/ Java, Spring, Kotlin, microservices, Kubernetes, containers Fri, 19 Jan 2024 09:25:53 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 kafka Archives - Piotr's TechBlog https://piotrminkowski.com/tag/kafka/ 32 32 181738725 Serverless on Azure with Spring Cloud Function https://piotrminkowski.com/2024/01/19/serverless-on-azure-with-spring-cloud-function/ https://piotrminkowski.com/2024/01/19/serverless-on-azure-with-spring-cloud-function/#respond Fri, 19 Jan 2024 09:25:49 +0000 https://piotrminkowski.com/?p=14829 This article will teach you how to create and run serverless apps on Azure using the Spring Cloud Function and Spring Cloud Azure projects. We will integrate with the Azure Functions and Azure Event Hubs services. It is not my first article about Azure and Spring Cloud. As a preparation for that exercise, it is […]

The post Serverless on Azure with Spring Cloud Function appeared first on Piotr's TechBlog.

]]>
This article will teach you how to create and run serverless apps on Azure using the Spring Cloud Function and Spring Cloud Azure projects. We will integrate with the Azure Functions and Azure Event Hubs services.

It is not my first article about Azure and Spring Cloud. As a preparation for that exercise, it is worth reading the article to familiarize yourself with some interesting features of Spring Cloud Azure. It describes an integration with Azure Spring Apps, Cosmos DB, and App Configuration services. On the other hand, if you are interested in CI/CD for Spring Boot apps you can refer to the following article about Azure DevOps and Terraform.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. The Spring Boot apps used in the article are located in the serverless directory. After you go to that directory you should just follow my further instructions.

Architecture

In this exercise, we will prepare two sample Spring Boot apps (aka functions) account-function and customer-function. Then, we will deploy them on the Azure Function service. Our apps do not communicate with each other directly but through the Azure Event Hubs service. Event Hubs is a cloud-native data streaming service compatible with the Apache Kafka API. After adding a new customer the customer-function sends an event to the Azure Event Hubs using the Spring Cloud Stream binder. The account-function app receives the event through the Azure Event Hubs trigger. Also, the customer-function is exposed to the external client through the Azure HTTP trigger. Here’s the diagram of our architecture:

azure-serverless-spring-cloud-arch

Prerequisites

There are some prerequisites before you start the exercise. You need to install JDK17+ and Maven on your local machine. You also need to have an account on Azure and az CLI to interact with that account. Once you install the az CLI and log in to Azure you can execute the following command for verification:

$ az account show

If you would like to test Azure Functions locally, you need to install Azure Functions Core Tools. You can find detailed installation instructions in Microsoft Docs here. For macOS, there are three required commands to run:

$ brew tap azure/functions
$ brew install azure-functions-core-tools@4
$ brew link --overwrite azure-functions-core-tools@4

Create Resources on Azure

Before we proceed with the source code, we need to create several required resources on the Azure cloud. In the first step, we will prepare a resource group for all required objects. The name of the group is spring-cloud-serverless. The location depends on your preferences. For me it is eastus.

$ az group create -l eastus -n spring-cloud-serverless

In the next step, we need to create a storage account. The Azure Function service requires it, but we will also use that account during the local development with Azure Functions Core Tools.

$ az storage account create -n pminkowsserverless \
     -g spring-cloud-serverless \
     -l eastus \
     --sku Standard_LRS

In order to run serverless apps on Azure with e.g. Spring Cloud Function, we need to create the Azure Function App instances. Of course, we use the previously created resource group and storage account. The name of my Function App instances are pminkows-account-function and pminkows-customer-function. We can also set a default OS type (Linux), functions version (4), and a runtime stack (Java) for each Function App.

$ az functionapp create -n pminkows-customer-function \
     -c eastus \
     --os-type Linux \
     --functions-version 4 \
     -g spring-cloud-serverless \
     --runtime java \
     --runtime-version 17.0 \
     -s pminkowsserverless

$ az functionapp create -n pminkows-account-function \
     -c eastus \
     --os-type Linux \
     --functions-version 4 \
     -g spring-cloud-serverless \
     --runtime java \
     --runtime-version 17.0 \
     -s pminkowsserverless

Then, we have to create the Azure Event Hubs namespace. The name of my namespace is spring-cloud-serverless. The same as before I choose the East US location and the spring-cloud-serverless resource group. We can also set the pricing tier (Standard) and the upper limit of throughput units when the AutoInflate option is enabled.

$ az eventhubs namespace create -n spring-cloud-serverless \
     -g spring-cloud-serverless \
     --location eastus \
     --sku Standard \
     --maximum-throughput-units 1 \
     --enable-auto-inflate true

Finally, we have to create topics on Event Hubs. Of course, they have to be assigned to the previously created spring-cloud-serverless Event Hubs namespace. The names of our topics are accounts and customers. The number of partitions is irrelevant in this exercise.

$ az eventhubs eventhub create -n accounts \
     -g spring-cloud-serverless \
     --namespace-name spring-cloud-serverless \
     --cleanup-policy Delete \
     --partition-count 3

$ az eventhubs eventhub create -n customers \
     -g spring-cloud-serverless \
     --namespace-name spring-cloud-serverless \
     --cleanup-policy Delete \
     --partition-count 3

Now, let’s switch to the Azure Portal. Find the spring-cloud-serverless resource group. You should have the same list of resources inside this group as shown below. It means that our environment is ready and we can proceed to the source code.

App Dependencies

Firstly, we need to declare the dependencyManagement section inside the Maven pom.xml for three projects used in the app implementation: Spring Boot, Spring Cloud, and Spring Cloud Azure.

<properties>
  <java.version>17</java.version>
  <spring-boot.version>3.1.4</spring-boot.version>
  <spring-cloud-azure.version>5.7.0</spring-cloud-azure.version>
  <spring-cloud.version>2022.0.4</spring-cloud.version>
  <maven.compiler.release>${java.version}</maven.compiler.release>
  <maven.compiler.source>${java.version}</maven.compiler.source>
  <maven.compiler.target>${java.version}</maven.compiler.target>
</properties>

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-dependencies</artifactId>
      <version>${spring-cloud.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-dependencies</artifactId>
      <version>${spring-cloud-azure.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-dependencies</artifactId>
      <version>${spring-boot.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<build>
  <plugins>
    <plugin>
      <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-maven-plugin</artifactId>
      <executions>
       <execution>
          <goals>
           <goal>repackage</goal>
          </goals>
         </execution>
       </executions>
    </plugin>
  </plugins>
</build>

Here’s the list of required Maven dependencies. We need to include the spring-cloud-function-context library to enable Spring Cloud Functions. In order to integrate with the Azure Functions service, we need to include the spring-cloud-function-adapter-azure extension. Our apps also send messages to the Azure Event Hubs service through the dedicated Spring Cloud Stream binder.

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-function-adapter-azure</artifactId>
  </dependency>
  <dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-function-context</artifactId>
  </dependency>
</dependencies>

Create Azure Serverless Apps with Spring Cloud

Expose Azure Function as HTTP endpoint

Let’s begin with the customer-function. To simplify the app we will use an in-memory H2 for storing data. Each time a new customer is added, it is persisted in the in-memory database using Spring Data JPA extension. Here’s our entity class:

@Entity
public class Customer {
   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long id;
   private String name;
   private int age;
   private String status;

   // GETTERS AND SETTERS
}

Here’s the Spring Data repository interface for the Customer entity:

public interface CustomerRepository extends ListCrudRepository<Customer, Long> {
}

Let’s proceed to the Spring Cloud Functions implementation. The CustomerInternalFunctions bean defines two functions. The addConsumer function (1) persists a new customer in the database and sends the event about it to the Azure Event Hubs topic. It uses the Spring Cloud Stream StreamBridge bean for interacting with Event Hubs. It also returns the persisted Customer entity as a response. The second function changeStatus (2) doesn’t return any data, it just needs to react to the incoming event and change the status of the particular customer. That event is sent by the account-function app after generating an account for a newly created customer. The important information here is that those are just Spring Cloud functions. For now, they have nothing to do with Azure Functions.

@Service
public class CustomerInternalFunctions {

   private static final Logger LOG = LoggerFactory
      .getLogger(CustomerInternalFunctions.class);

   private StreamBridge streamBridge;
   private CustomerRepository repository;

   public CustomerInternalFunctions(StreamBridge streamBridge, 
                                    CustomerRepository repository) {
      this.streamBridge = streamBridge;
      this.repository = repository;
   }

   // (1)
   @Bean
   public Function<Customer, Customer> addCustomer() {
      return c -> {
         Customer newCustomer = repository.save(c);
         streamBridge.send("customers-out-0", newCustomer);
         LOG.info("New customer added: {}", c);
         return newCustomer;
      };
   }

   // (2)
   @Bean
   public Consumer<Account> changeStatus() {
      return account -> {
         Customer customer = repository.findById(account.getCustomerId())
                .orElseThrow();
         customer.setStatus(Customer.CUSTOMER_STATUS_ACC_ACTIVE);
         repository.save(customer);
         LOG.info("Customer activated: id={}", customer.getId());
      };
   }

}

We also need to provide several configuration properties in the Spring Boot application.properties file. We should set the Azure Event Hubs connection URL and the name of the target topic. Since our app uses Spring Cloud Stream only for sending events we should also turn off the autodiscovery of functional beans as messaging bindings.

spring.cloud.azure.eventhubs.connection-string = ${EVENT_HUBS_CONNECTION_STRING}
spring.cloud.stream.bindings.customers-out-0.destination = customers
spring.cloud.stream.function.autodetect = false

In order to expose the functions on Azure, we will take advantage of the Spring Cloud Function Azure Adapter. It includes several Azure libraries to our Maven dependencies. It can invoke the Spring Cloud Functions directly or through the lookup approach with the FunctionCatalog bean (1). The method has to be annotated with @FunctionName, which defines the name of the function in Azure (2). In order to expose that function over HTTP, we need to define the Azure @HttpTrigger (3). The trigger exposes the function as the POST endpoint and doesn’t require any authorization. Our method receives the request through the HttpRequestMessage object. Then, it invokes the Spring Cloud function addCustomer by name using the FunctionCatalog bean (4).

// (1)
@Autowired
private FunctionCatalog functionCatalog;

@FunctionName("add-customer") // (2)
public Customer addCustomerFunc(
        // (3)
        @HttpTrigger(name = "req",
                     methods = { HttpMethod.POST },
                     authLevel = AuthorizationLevel.ANONYMOUS)
        HttpRequestMessage<Optional<Customer>> request,
        ExecutionContext context) {
   Customer c = request.getBody().orElseThrow();
   context.getLogger().info("Request: {}" + c);
   // (4)
   Function<Customer, Customer> function = functionCatalog
      .lookup("addCustomer");
   return function.apply(c);
}

Integrate Function with Azure Data Hubs Trigger

Let’s switch to the account-function app directory inside our Git repository. The same as customer-function it uses an in-memory H2 database for storing customer accounts. Here’s our Account entity class:

@Entity
public class Account {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String number;
    private Long customerId;
    private int balance;

   // GETTERS AND SETTERS ...
}

Inside the account-function app, there is a single Spring Cloud Function addAccount. It generates a new 16-digit account number for each new customer. Then, it saves the account in the database and sends the event to the Azure Event Hubs with Spring Cloud Stream StreamBridge bean.

@Service
public class AccountInternalFunctions {

   private static final Logger LOG = LoggerFactory
      .getLogger(AccountInternalFunctions.class);

   private StreamBridge streamBridge;
   private AccountRepository repository;

   public AccountInternalFunctions(StreamBridge streamBridge, 
                                   AccountRepository repository) {
      this.streamBridge = streamBridge;
      this.repository = repository;
   }

   @Bean
   public Function<Customer, Account> addAccount() {
      return customer -> {
         String n = RandomStringUtils.random(16, false, true);
         Account a = new Account(n, customer.getId(), 0);
         a = repository.save(a);
         boolean b = streamBridge.send("accounts-out-0", a);
         LOG.info("New account added: {}", a);
         return a;
      };
   }

}

The same as for customer-function we need to provide some configuration settings for Spring Cloud Stream. However, instead of the customers topic, this time we are sending events to the accounts topic.

spring.cloud.azure.eventhubs.connection-string = ${EVENT_HUBS_CONNECTION_STRING}
spring.cloud.stream.bindings.accounts-out-0.destination = accounts
spring.cloud.stream.function.autodetect = false

The account-function app is not exposed as the HTTP endpoint. We want to trigger the function in reaction to the event delivered to the Azure Event Hubs topic. The name of our Azure function is new-customer (1). It receives the Customer event from the customers topic thanks to the @EventHubTrigger annotation (2). This annotation also defines a property name, that contains the address of the Azure Event Hubs namespace (EVENT_HUBS_CONNECTION_STRING). I’ll show you later how to set such a property for our function in Azure. Once, the new-customer function is triggered, it invokes the Spring Cloud Function addAccount (3).

@Autowired
private FunctionCatalog functionCatalog;

// (1)
@FunctionName("new-customer")
public void newAccountEventFunc(
        // (2)
        @EventHubTrigger(eventHubName = "customers",
                         name = "newAccountTrigger",
                         connection = "EVENT_HUBS_CONNECTION_STRING",
                         cardinality = Cardinality.ONE)
        Customer event,
        ExecutionContext context) {
   context.getLogger().info("Event: " + event);
   // (3)
   Function<Customer, Account> function = functionCatalog
      .lookup("addAccount");
   function.apply(event);
}

At the end of this section, let’s switch back once again to the customer-function app. It fires on the event sent by the new-customer function to the accounts topic. Therefore we are using the @EventHubTrigger annotation once again.

@FunctionName("activate-customer")
public void activateCustomerEventFunc(
          @EventHubTrigger(eventHubName = "accounts",
                 name = "changeStatusTrigger",
                 connection = "EVENT_HUBS_CONNECTION_STRING",
                 cardinality = Cardinality.ONE)
          Account event,
          ExecutionContext context) {
   context.getLogger().info("Event: " + event);
   Consumer<Account> consumer = functionCatalog.lookup("changeStatus");
   consumer.accept(event);
}

All our functions are ready. Now, we can proceed to the deployment phase.

Running Azure Functions Locally with Maven

Before we deploy our functions on Azure, we can run and test them locally. I assume you have already the Azure Functions Core Tools according to the “Prerequisites” section. Our function still needs to connect with some services on the cloud like Azure Event Hubs or the storage account. The address to the Azure Event Hubs should be set as the EVENT_HUBS_CONNECTION_STRING app property or environment variable. In order to find the Event Hubs connection string, we should switch to the Azure Portal and find the spring-cloud-serverless namespace. Then, we need to go to the “Shared access policies” menu item and click the “RootManagedSharedAccessKey” policy. The connection string value is available in the “Connection string-primary key” field.

We also need to obtain the connection credentials to the Azure storage account. In your storage account, you should find the “Access keys” section and copy the value from the “Connection string” field.

After that, we can create the local.settings.json file in each app’s root directory. Alternatively, we can just set the environment variables AzureWebJobsStorage and EVENT_HUBS_CONNECTION_STRING.

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": <YOUR_ACCOUNT_STORAGE_CONNECTION_STRING>,
    "FUNCTIONS_WORKER_RUNTIME": "java",
    "EVENT_HUBS_CONNECTION_STRING": <YOUR_EVENT_HUBS_CONNECTION_STRING>
  }
}

Once you place your credentials in the local.settings.json file, you can build the app with Maven.

$ mvn clean package

After that, you can use the Maven plugin included in the spring-cloud-function-adapter-azure module. In order to run the function, you need to execute the following command:

$ mvn azure-functions:run 

Here’s the command output for the customer-function app. As you see, it contains two Azure functions: add-customer (HTTP) and activate-customer (Event Hub Trigger). You can test the function by invoking the http://localhost:7071/api/add-customer URL.

Here’s the command output for the account-function app. As you see, it contains a single function new-customer activated through the Event Hub trigger.

Deploy Spring Cloud Serverless on Azure Functions

Let’s take a look at the pminkows-customer-function Azure Function App before we deploy our first app there. The http://pminkows-customer-function.azurewebsites.net base URL will precede all the HTTP endpoint URLs for our functions. We should remember the name of the automatically generated service plan (EastUSLinuxDynamicPlan).

Before deploying our Spring Boot app we should create the host.json file with the following content:

{
  "version": "2.0",
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  }
}

Then, we should add the azure-functions-maven-plugin Maven plugin to our pom.xml. In the configuration section, we have to set the name of the Azure Function App instance (pminkows-customer-function), the target resource group (spring-cloud-serverless), the region (eastus), the service plan (EastUSLinuxDynamicPlan), and the location of the host.json file. We also need to set the connection string to the Azure Event Hubs inside the EVENT_HUBS_CONNECTION_STRING app property. So before running the build, you should export the value of that environment variable.

<plugin>
  <groupId>com.microsoft.azure</groupId>
  <artifactId>azure-functions-maven-plugin</artifactId>
  <version>1.30.0</version>
  <configuration>
    <appName>pminkows-customer-function</appName>
    <resourceGroup>spring-cloud-serverless</resourceGroup>
    <region>eastus</region>
    <appServicePlanName>EastUSLinuxDynamicPlan</appServicePlanName>
    <hostJson>${project.basedir}/src/main/resources/host.json</hostJson>
    <runtime>
      <os>linux</os>
      <javaVersion>17</javaVersion>
    </runtime>
    <appSettings>
      <property>
        <name>FUNCTIONS_EXTENSION_VERSION</name>
        <value>~4</value>
      </property>
      <property>
        <name>EVENT_HUBS_CONNECTION_STRING</name>
        <value>${EVENT_HUBS_CONNECTION_STRING}</value>
      </property>
    </appSettings>
  </configuration>
  <executions>
    <execution>
      <id>package-functions</id>
      <goals>
        <goal>package</goal>
      </goals>
    </execution>
  </executions>
</plugin>

Let’s begin with the customer-function app. Before deploying the app we need to build it first with the mvn clean package command. Once you do it you can run your first function Azure with the following command:

$ mvn azure-functions:deploy

Here’s my output after running that command. As you see, the function add-customer is exposed under the http://pminkows-customer-function.azurewebsites.net/api/add-customer URL.

azure-serverless-spring-cloud-deploy-mvn

We can come back to the pminkows-customer-function Azure Function App in the portal. According to the expectations, two functions are running there. The activate-customer function is triggered by the Azure Event Hub.

azure-serverless-spring-cloud-functions

Let’s switch to the account-function directory. We will deploy it to the Azure pminkows-account-function function. Once again we need to build the app with mvn clean package command, and then deploy it using the mvn azure-functions:deploy command. Here’s the output. There are no HTTP triggers defined, but just a single function triggered by the Azure Event Hub.

Here are the details of the pminkows-account-function Function App in the Azure Portal.

Invoke Azure Functions

Finally, we can test our functions by calling the following endpoint using e.g. curl. Let’s repeat the similar command several times with different data:

$ curl https://pminkows-customer-function.azurewebsites.net/api/add-customer \ 
    -d "{\"name\":\"Test\",\"age\":33}" \
    -H "Content-Type: application/json"

After that, we should switch to the Azure Portal. Go to the pminkows-customer-function details and click the link “Invocation and more” on the add-customer function.

You will be redirected to the Azure Monitor statistics for that function. Azure Monitor displays a list of invocations with statuses.

azure-serverless-spring-cloud-functions-invocations

We can click one of the records from the invocation history to see the details.

As you probably remember, the add-customer function sends messages to Azure Event Hubs. On the other hand, we can also verify how the new-customer function in the pminkows-account-function consumes and handles those events.

azure-serverless-spring-cloud-functions-logs

Final Thoughts

This article gives you a comprehensive guide on how to build and run Spring Cloud serverless apps on Azure Functions. It explains the concept of the triggers in Azure Functions and shows the integration with Azure Event Hubs. Finally, it shows how to run such functions locally and then monitor them on Azure after deployment.

The post Serverless on Azure with Spring Cloud Function appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2024/01/19/serverless-on-azure-with-spring-cloud-function/feed/ 0 14829
Kafka Tracing with Spring Boot and Open Telemetry https://piotrminkowski.com/2023/11/15/kafka-tracing-with-spring-boot-and-open-telemetry/ https://piotrminkowski.com/2023/11/15/kafka-tracing-with-spring-boot-and-open-telemetry/#comments Wed, 15 Nov 2023 11:26:33 +0000 https://piotrminkowski.com/?p=14669 In this article, you will learn how to configure tracing for Kafka producer and consumer with Spring Boot and Open Telemetry. We will use the Micrometer library for sending traces and Jaeger for storing and visualizing them. Spring Kafka comes with built-in integration with Micrometer for the KafkaTemplate and listener containers. You will also see […]

The post Kafka Tracing with Spring Boot and Open Telemetry appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to configure tracing for Kafka producer and consumer with Spring Boot and Open Telemetry. We will use the Micrometer library for sending traces and Jaeger for storing and visualizing them. Spring Kafka comes with built-in integration with Micrometer for the KafkaTemplate and listener containers. You will also see how to configure the Spring Kafka observability to add our custom tags to traces.

If you are interested in Kafka and Spring Boot, you may find several articles on my blog about it. To read about concurrency with Kafka and Spring Boot read the following post. For example, there is also an interesting article about Kafka transactions here.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should go to the kafka directory. After that, you should just follow my instructions. Let’s begin.

Dependencies

Let’s take a look at the list of required Maven dependencies. It is the same for both of our sample Spring Boot apps. Of course, we need to add the Spring Boot starter and the Spring Kafka for sending or receiving messages. In order to automatically generate traces related to each message, we are including the Spring Boot Actuator and the Micrometer Tracing Open Telemetry bridge. Finally, we need to include the opentelemetry-exporter-otlp library to export traces outside the app.

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
  </dependency>
  <dependency>
    <groupId>io.micrometer</groupId>
    <artifactId>micrometer-tracing-bridge-otel</artifactId>
  </dependency>
  <dependency>
    <groupId>io.opentelemetry</groupId>
    <artifactId>opentelemetry-exporter-otlp</artifactId>
  </dependency>
</dependencies>

Spring Boot Kafka Tracing for Producer

Our apps don’t do anything complicated. They are just sending and receiving messages. Here’s the class representing the message exchanged between both apps.

public class Info {

    private Long id;
    private String source;
    private String space;
    private String cluster;
    private String message;

    public Info(Long id, String source, String space, String cluster, 
                String message) {
       this.id = id;
       this.source = source;
       this.space = space;
       this.cluster = cluster;
       this.message = message;
    }

   // GETTERS AND SETTERS
}

Let’s begin with the producer app. It generates and sends one message per second. Here’s the implementation of a @Service bean responsible for producing messages. It injects and uses the KafkaTemplate bean for that.

@Service
public class SenderService {

   private static final Logger LOG = LoggerFactory
      .getLogger(SenderService.class);

   AtomicLong id = new AtomicLong();
   @Autowired
   KafkaTemplate<Long, Info> template;

   @Value("${POD:kafka-producer}")
   private String pod;
   @Value("${NAMESPACE:empty}")
   private String namespace;
   @Value("${CLUSTER:localhost}")
   private String cluster;
   @Value("${TOPIC:info}")
   private String topic;

   @Scheduled(fixedRate = 1000)
   public void send() {
      Info info = new Info(id.incrementAndGet(), 
                           pod, 
                           namespace, 
                           cluster, 
                           "HELLO");
      CompletableFuture<SendResult<Long, Info>> result = template
         .send(topic, info.getId(), info);
      result.whenComplete((sr, ex) ->
                LOG.info("Sent({}): {}", sr.getProducerRecord().key(), 
                         sr.getProducerRecord().value()));
    }

}

Spring Boot provides an auto-configured instance of KafkaTemplate. However, to enable Kafka tracing with Spring Boot we need to customize that instance. Here’s the implementation of the KafkaTemplate bean inside the producer app’s main class. In order to enable tracing, we need to invoke the setObservationEnabled method. By default, the Micrometer module generates some generic tags. We want to add at least the name of the target topic and the Kafka message key. Therefore we are creating our custom implementation of the KafkaTemplateObservationConvention interface. It uses the KafkaRecordSenderContext to retrieve the topic name and the message key from the ProducerRecord object.

@SpringBootApplication
@EnableScheduling
public class KafkaProducer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaProducer.class);

   public static void main(String[] args) {
      SpringApplication.run(KafkaProducer.class, args);
   }

   @Bean
   public NewTopic infoTopic() {
      return TopicBuilder.name("info")
             .partitions(1)
             .replicas(1)
             .build();
   }

   @Bean
   public KafkaTemplate<Long, Info> kafkaTemplate(ProducerFactory<Long, Info> producerFactory) {
      KafkaTemplate<Long, Info> t = new KafkaTemplate<>(producerFactory);
      t.setObservationEnabled(true);
      t.setObservationConvention(new KafkaTemplateObservationConvention() {
         @Override
         public KeyValues getLowCardinalityKeyValues(KafkaRecordSenderContext context) {
            return KeyValues.of("topic", context.getDestination(),
                    "id", String.valueOf(context.getRecord().key()));
         }
      });
      return t;
   }

}

We also need to set the address of the Jaeger instance and decide which percentage of spans will be exported. Here’s the application.yml file with the required properties:

spring:
  application.name: kafka-producer
  kafka:
    bootstrap-servers: ${KAFKA_URL:localhost}:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

management:
  tracing:
    enabled: true
    sampling:
      probability: 1.0
  otlp:
    tracing:
      endpoint: http://jaeger:4318/v1/traces

Spring Boot Kafka Tracing for Consumer

Let’s switch to the consumer app. It just receives and prints messages coming to the Kafka topic. Here’s the implementation of the listener @Service. Besides the whole message content, it also prints the message key and a topic partition number.

@Service
public class ListenerService {

   private static final Logger LOG = LoggerFactory
      .getLogger(ListenerService.class);

   @KafkaListener(id = "info", topics = "${app.in.topic}")
   public void onMessage(@Payload Info info,
                         @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Long key,
                         @Header(KafkaHeaders.RECEIVED_PARTITION) int partition) {
      LOG.info("Received(key={}, partition={}): {}", key, partition, info);
   }

}

In order to generate and export traces on the consumer side we need to override the ConcurrentKafkaListenerContainerFactory bean. For the container listener factory, we should obtain the ContainerProperties instance and then invoke the setObservationEnabled method. The same as before we can create a custom implementation of the KafkaTemplateObservationConvention interface to include the additional tags (optionally).

@SpringBootApplication
@EnableKafka
public class KafkaConsumer {

   private static final Logger LOG = LoggerFactory
      .getLogger(KafkaConsumer.class);

    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumer.class, args);
    }

    @Value("${app.in.topic}")
    private String topic;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> listenerFactory(ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setObservationEnabled(true);
        factory.setConsumerFactory(consumerFactory);
        return factory;
    }

    @Bean
    public NewTopic infoTopic() {
        return TopicBuilder.name(topic)
                .partitions(10)
                .replicas(3)
                .build();
    }

}

Of course, we also need to set a Jaeger address in the application.yml file:

spring:
  application.name: kafka-consumer
  kafka:
    bootstrap-servers: ${KAFKA_URL:localhost}:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"

app.in.topic: ${TOPIC:info}

management:
  tracing:
    enabled: true
    sampling:
      probability: 1.0
  otlp:
    tracing:
      endpoint: http://jaeger:4318/v1/traces

Trying on Docker

Once we finish the implementation we can try out our solution. We will run both Kafka and Jaeger as Docker containers. Firstly, let’s build the project and container images for the producer and consumer apps. Spring Boot provides built-in tools for that. Therefore, we just need to execute the following command:

$ mvn clean package spring-boot:build-image

After that, we can define the docker-compose.yml file with a list of containers. It is possible to dynamically override Spring Boot properties using a style based on environment variables. Thanks to that, we can easily change the Kafka and Jaeger addresses for the containers. Here’s our docker-compose.yml:

version: "3.8"
services:
  broker:
    image: moeenz/docker-kafka-kraft:latest
    restart: always
    ports:
      - "9092:9092"
    environment:
      - KRAFT_CONTAINER_HOST_NAME=broker
  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686"
      - "4317:4317"
      - "4318:4318"
  producer:
    image: library/producer:1.0-SNAPSHOT
    links:
      - broker
      - jaeger
    environment:
      MANAGEMENT_OTLP_TRACING_ENDPOINT: http://jaeger:4318/v1/traces
      SPRING_KAFKA_BOOTSTRAP_SERVERS: broker:9092
  consumer:
    image: library/consumer:1.0-SNAPSHOT
    links:
      - broker
      - jaeger
    environment:
      MANAGEMENT_OTLP_TRACING_ENDPOINT: http://jaeger:4318/v1/traces
      SPRING_KAFKA_BOOTSTRAP_SERVERS: broker:9092

Let’s run all the defined containers with the following command:

$ docker compose up

Our apps are running and exchanging messages:

The Jaeger dashboard is available under the 16686 port. As you see, there are several traces with the kafka-producer and kafka-consumer spans.

spring-boot-kafka-tracing-jaeger

We can go into the details of each entry. The trace generated by the producer app is always correlated to the trace generated by the consumer app for every single message. There are also our two custom tags (id and topic) with values added by the KafkaTemplate bean.

spring-boot-kafka-tracing-details

Running on Kubernetes

Our sample apps are prepared for being deployed on Kubernetes. You can easily do it with the Skaffold CLI. Before that, we need to install Kafka and Jaeger on Kubernetes. I will not get into details about Kafka installation. You can find a detailed description of how to run Kafka on Kubernetes with the Strimzi operator in my article available here. After that, we can proceed to the Jaeger installation. In the first step, we need to add the following Helm repository:

$ helm repo add jaegertracing https://jaegertracing.github.io/helm-charts

By default, the Jaeger Helm chart doesn’t expose OTLP endpoints. In order to enable them, we need to override some default settings. Here’s our values YAML manifest:

collector:
  service:
    otlp:
      grpc:
        name: otlp-grpc
        port: 4317
      http:
        name: otlp-http
        port: 4318

Let’s install Jaeger in the jaeger namespace with the parameters from jaeger-values.yaml:

$ helm install jaeger jaegertracing/jaeger -n jaeger \
    --create-namespace \
    -f jaeger-values.yaml

Once we install Jaeger we can verify a list of Kubernetes Services. We will use the jaeger-collector service to send traces for the apps and the jaeger-query service to access the UI dashboard.

$ kubectl get svc -n jaeger
NAME               TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                                           AGE
jaeger-agent       ClusterIP   10.96.147.104   <none>        5775/UDP,6831/UDP,6832/UDP,5778/TCP,14271/TCP     14m
jaeger-cassandra   ClusterIP   None            <none>        7000/TCP,7001/TCP,7199/TCP,9042/TCP,9160/TCP      14m
jaeger-collector   ClusterIP   10.96.111.236   <none>        14250/TCP,14268/TCP,4317/TCP,4318/TCP,14269/TCP   14m
jaeger-query       ClusterIP   10.96.88.64     <none>        80/TCP,16685/TCP,16687/TCP                        14m

Finally, we can run our sample Spring Boot apps that connect to Kafka and Jaeger. Here’s the Deployment object for the producer app. It overrides the default Kafka and Jaeger addresses by defining the KAFKA_URL and MANAGEMENT_OTLP_TRACING_ENDPOINT environment variables.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
      - name: producer
        image: piomin/producer
        resources:
          requests:
            memory: 200Mi
            cpu: 100m
        ports:
        - containerPort: 8080
        env:
          - name: MANAGEMENT_OTLP_TRACING_ENDPOINT
            value: http://jaeger-collector.jaeger:4318/v1/traces
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap
          - name: CLUSTER
            value: c1
          - name: TOPIC
            value: test-1
          - name: POD
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
          - name: NAMESPACE
            valueFrom:
              fieldRef:
                fieldPath: metadata.namespace

Here’s a similar Deployment object for the consumer app:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer-1
spec:
  selector:
    matchLabels:
      app: consumer-1
  template:
    metadata:
      labels:
        app: consumer-1
    spec:
      containers:
      - name: consumer
        image: piomin/consumer
        resources:
          requests:
            memory: 200Mi
            cpu: 100m
        ports:
        - containerPort: 8080
        env:
          - name: SPRING_APPLICATION_NAME
            value: kafka-consumer-1
          - name: TOPIC
            value: test-1
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap
          - name: MANAGEMENT_OTLP_TRACING_ENDPOINT
            value: http://jaeger-collector.jaeger:4318/v1/traces

Assuming that you are inside the kafka directory in the Git repository, you just need to run the following command to deploy both apps. By the way, I’ll create two deployments of the consumer app (consumer-1 and consumer-2) just for Jaeger visualization purposes.

$ skaffold run -n strimzi --tail

Once you run the apps, you can go to the Jaeger dashboard and verify the list of traces. In order to access the dashboard, we can enable port forwarding for the jaeger-query Service.

$ kubectl port-forward svc/jaeger-query 80:80

Final Thoughts

Integration between Spring Kafka and Micrometer Tracing is a relatively new feature available since the 3.0 version. It is possible, that it will be improved soon with some new features. Anyway, currently it gives a simple way to generate and send traces from Kafka producers and consumers.

The post Kafka Tracing with Spring Boot and Open Telemetry appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/11/15/kafka-tracing-with-spring-boot-and-open-telemetry/feed/ 11 14669
Spring Boot Development Mode with Testcontainers and Docker https://piotrminkowski.com/2023/05/26/spring-boot-development-mode-with-testcontainers-and-docker/ https://piotrminkowski.com/2023/05/26/spring-boot-development-mode-with-testcontainers-and-docker/#comments Fri, 26 May 2023 14:26:38 +0000 https://piotrminkowski.com/?p=14207 In this article, you will learn how to use Spring Boot built-in support for Testcontainers and Docker Compose to run external services in development mode. Spring Boot introduces those features in the current latest version 3.1. Of course, you can already take advantage of Testcontainers in your Spring Boot app tests. However, the ability to […]

The post Spring Boot Development Mode with Testcontainers and Docker appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Spring Boot built-in support for Testcontainers and Docker Compose to run external services in development mode. Spring Boot introduces those features in the current latest version 3.1. Of course, you can already take advantage of Testcontainers in your Spring Boot app tests. However, the ability to run external databases, message brokers, or other external services on app startup was something I was waiting for. Especially, since the competitive framework, Quarkus, already provides a similar feature called Dev Services, which is very useful during my development. Also, we should not forget about another exciting feature – integration with Docker Compose. Let’s begin.

If you are looking for more articles related to Spring Boot 3 you can refer to the following one, about microservices with Spring Cloud.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. Since I’m using Testcontainers often, you can find examples in my several repositories. Here’s a list of repositories we will use today:

You can clone them and then follow my instruction to see how to leverage Spring Boot built-in support for Testcontainers and Docker Compose in development mode.

Use Testcontainers in Tests

Let’s start with the standard usage example. The first repository has a single Spring Boot app that connects to the Mongo database. In order to build automated tests we have to include the following Maven dependencies:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>mongodb</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>junit-jupiter</artifactId>
  <scope>test</scope>
</dependency>

Now, we can create the tests. We need to annotate our test class with @Testcontainers. Then, we have to declare the MongoDBContainer bean. Before Spring Boot 3.1, we would have to use DynamicPropertyRegistry to set the Mongo address automatically generated by Testcontainers.

@SpringBootTest(webEnvironment = 
   SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class PersonControllerTest {

   @Container
   static MongoDBContainer mongodb = 
      new MongoDBContainer("mongo:5.0");

   @DynamicPropertySource
   static void registerMongoProperties(DynamicPropertyRegistry registry) {
      registry.add("spring.data.mongodb.uri", mongodb::getReplicaSetUrl);
   }

   // ... test methods

}

Fortunately, beginning from Spring Boot 3.1 we can simplify that notation with @ServiceConnection annotation. Here’s the full test implementation with the latest approach. It verifies some REST endpoints exposed by the app.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class PersonControllerTest {

    private static String id;

    @Container
    @ServiceConnection
    static MongoDBContainer mongodb = new MongoDBContainer("mongo:5.0");

    @Autowired
    TestRestTemplate restTemplate;

    @Test
    @Order(1)
    void add() {
        Person p = new Person(null, "Test", "Test", 100, Gender.FEMALE);
        Person personAdded = restTemplate
            .postForObject("/persons", p, Person.class);
        assertNotNull(personAdded);
        assertNotNull(personAdded.getId());
        assertEquals(p.getLastName(), personAdded.getLastName());
        id = personAdded.getId();
    }

    @Test
    @Order(2)
    void findById() {
        Person person = restTemplate
            .getForObject("/persons/{id}", Person.class, id);
        assertNotNull(person);
        assertNotNull(person.getId());
        assertEquals(id, person.getId());
    }

    @Test
    @Order(2)
    void findAll() {
        Person[] persons = restTemplate
            .getForObject("/persons", Person[].class);
        assertEquals(6, persons.length);
    }

}

Now, we can build the project with the standard Maven command. Then Testcontainers will automatically start the Mongo database before the test. Of course, we need to have Docker running on our machine.

$ mvn clean package

Tests run fine. But what will happen if we would like to run our app locally for development? We can do it by running the app main class directly from IDE or with the mvn spring-boot:run Maven command. Here’s our main class:

@SpringBootApplication
@EnableMongoRepositories
public class SpringBootOnKubernetesApp implements ApplicationListener<ApplicationReadyEvent> {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootOnKubernetesApp.class, args);
    }

    @Autowired
    PersonRepository repository;

    @Override
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
        if (repository.count() == 0) {
            repository.save(new Person(null, "XXX", "FFF", 20, Gender.MALE));
            repository.save(new Person(null, "AAA", "EEE", 30, Gender.MALE));
            repository.save(new Person(null, "ZZZ", "DDD", 40, Gender.FEMALE));
            repository.save(new Person(null, "BBB", "CCC", 50, Gender.MALE));
            repository.save(new Person(null, "YYY", "JJJ", 60, Gender.FEMALE));
        }
    }
}

Of course, unless we start the Mongo database our app won’t be able to connect it. If we use Docker, we first need to execute the docker run command that runs MongoDB and exposes it on the local port.

spring-boot-testcontainers-logs

Use Testcontainers in Development Mode with Spring Boot

Fortunately, with Spring Boot 3.1 we can simplify that process. We don’t have to Mongo before starting the app. What we need to do – is to enable development mode with Testcontainers. Firstly, we should include the following Maven dependency in the test scope:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-testcontainers</artifactId>
  <scope>test</scope>
</dependency>

Then we need to prepare the @TestConfiguration class with the definition of containers we want to start together with the app. For me, it is just a single MongoDB container as shown below:

@TestConfiguration
public class MongoDBContainerDevMode {

    @Bean
    @ServiceConnection
    MongoDBContainer mongoDBContainer() {
        return new MongoDBContainer("mongo:5.0");
    }

}

After that, we have to “override” the Spring Boot main class. It should have the same name as the main class with the Test suffix. Then we pass the current main method inside the SpringApplication.from(...) method. We also need to set @TestConfiguration class using the with(...) method.

public class SpringBootOnKubernetesAppTest {

    public static void main(String[] args) {
        SpringApplication.from(SpringBootOnKubernetesApp::main)
                .with(MongoDBContainerDevMode.class)
                .run(args);
    }

}

Finally, we can start our “test” main class directly from the IDE or we can just execute the following Maven command:

$ mvn spring-boot:test-run

Once the app starts you will see that the Mongo container is up and running and connection to it is established.

Since we are in dev mode we will also include the Spring Devtools module to automatically restart the app after the source code change.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-devtools</artifactId>
  <optional>true</optional>
</dependency>

Let’s what happened. Once we provide a change in the source code Spring Devtools will restart the app and the Mongo container. You can verify it in the app logs and also on the list of running Docker containers. As you see the Testcontainer ryuk has been initially started a minute ago, while Mongo was restarted after the app restarted 9 seconds ago.

In order to prevent restarting the container on app restart with Devtools we need to annotate the MongoDBContainer bean with @RestartScope.

@TestConfiguration
public class MongoDBContainerDevMode {

    @Bean
    @ServiceConnection
    @RestartScope
    MongoDBContainer mongoDBContainer() {
        return new MongoDBContainer("mongo:5.0");
    }

}

Now, Devtools just restart the app without restarting the container.

spring-boot-testcontainers-containers

Sharing Container across Multiple Apps

In the previous example, we have a single app that connects to the database on a single container. Now, we will switch to the repository with some microservices that communicates with each other via the Kafka broker. Let’s say I want to develop and test all three apps simultaneously. Of course, our services need to have the following Maven dependencies:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-testcontainers</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>kafka</artifactId>
  <version>1.18.1</version>
  <scope>test</scope>
</dependency>

Then we need to do a very similar thing as before – declare the @TestConfiguration bean with a list of required containers. However, this time we need to make our Kafka container reusable between several apps. In order to do that, we will invoke the withReuse(true) on the KafkaContainer. By the way, it is also possible to use Kafka Raft mode instead of Zookeeper.

@TestConfiguration
public class KafkaContainerDevMode {

    @Bean
    @ServiceConnection
    public KafkaContainer kafka() {
        return new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
                .withKraft()
                .withReuse(true);
    }

}

The same as before we have to create a “test” main class that uses the @TestConfiguration bean. We will do the same thing for two other apps inside the repository: payment-service and stock-service.

public class OrderAppTest {

    public static void main(String[] args) {
        SpringApplication.from(OrderApp::main)
                .with(KafkaContainerDevMode.class)
                .run(args);
    }

}

Let’s run our three microservices. Just to remind you, it is possible to run the “test” main class directly from IDE or with the mvn spring-boot:test-run command. As you see, I run all three apps.

spring-boot-testcontainers-microservices

Now, if we display a list of running containers, there is only one Kafka broker shared between all the apps.

Use Spring Boot support for Docker Compose

Beginning from version 3.1 Spring Boot provides built-in support for Docker Compose. Let’s switch to our last sample repository. It consists of several microservices that connect to the Mongo database and the Netflix Eureka discovery server. We can go to the directory with one of the microservices, e.g. customer-service. Assuming we include the following Maven dependency, Spring Boot looks for a Docker Compose configuration file in the current working directory. Let’s activate that mechanism only for a specific Maven profile:

<profiles>
  <profile>
    <id>compose</id>
    <dependencies>
      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-docker-compose</artifactId>
        <optional>true</optional>
      </dependency>
    </dependencies>
  </profile>
</profiles>

Our goal is to run all the required external services before running the customer-service app. The customer-service app connects to Mongo, Eureka, and calls endpoint exposed by the account-service. Here’s the implementation of the REST client that communicates to the account-service.

@FeignClient("account-service")
public interface AccountClient {

    @RequestMapping(method = RequestMethod.GET, value = "/accounts/customer/{customerId}")
    List<Account> getAccounts(@PathVariable("customerId") String customerId);

}

We need to prepare the docker-compose.yml with all required containers definition. As you see, there is the mongo service and two applications discovery-service and account-service, which uses local Docker images.

version: "3.8"
services:
  mongo:
    image: mongo:5.0
    ports:
      - "27017:27017"
  discovery-service:
    image: sample-spring-microservices-advanced/discovery-service:1.0-SNAPSHOT
    ports:
      - "8761:8761"
    healthcheck:
      test: curl --fail http://localhost:8761/eureka/v2/apps || exit 1
      interval: 4s
      timeout: 2s
      retries: 3
    environment:
      SPRING_PROFILES_ACTIVE: docker
  account-service:
    image: sample-spring-microservices-advanced/account-service:1.0-SNAPSHOT
    ports:
      - "8080"
    depends_on:
      discovery-service:
        condition: service_healthy
    links:
      - mongo
      - discovery-service
    environment:
      SPRING_PROFILES_ACTIVE: docker

Before we run the service, let’s build the images with our apps. We could as well use built-in Spring Boot mechanisms based on Buildpacks, but I’ve got some problems with it. Jib works fine in my case.

<profile>
  <id>build-image</id>
  <build>
    <plugins>
      <plugin>
        <groupId>com.google.cloud.tools</groupId>
        <artifactId>jib-maven-plugin</artifactId>
        <version>3.3.2</version>
        <configuration>
          <to>
            <image>sample-spring-microservices-advanced/${project.artifactId}:${project.version}</image>
          </to>
        </configuration>
        <executions>
          <execution>
            <goals>
              <goal>dockerBuild</goal>
            </goals>
            <phase>package</phase>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</profile>

Let’s execute the following command on the repository root directory:

$ mvn clean package -Pbuild-image -DskipTests

After a successful build, we can verify a list of available images with the docker images command. As you see, there are two images used in our docker-compose.yml file:

Finally, the only thing you need to do is to run the customer-service app. Let’s switch to the customer-service directory once again and execute the mvn spring-boot:run with a profile that includes the spring-boot-docker-compose dependency:

$ mvn spring-boot:run -Pcompose

As you see, our app locates docker-compose.yml.

spring-boot-testcontainers-docker-compose

Once we start our app, it also starts all required containers.

For example, we can take a look at the Eureka dashboard available at http://localhost:8761. There are two apps registered there. The account-service is running on Docker, while the customer-service has been started locally.

Final Thoughts

Spring Boot 3.1 comes with several improvements in the area of containerization. Especially the feature related to the ability to run Testcontainers in development together with the app was something that I was waiting for. I hope this article will clarify how you can take advantage of the latest Spring Boot features for better integration with Testcontainers and Docker Compose.

The post Spring Boot Development Mode with Testcontainers and Docker appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/05/26/spring-boot-development-mode-with-testcontainers-and-docker/feed/ 5 14207
Concurrency with Kafka and Spring Boot https://piotrminkowski.com/2023/04/30/concurrency-with-kafka-and-spring-boot/ https://piotrminkowski.com/2023/04/30/concurrency-with-kafka-and-spring-boot/#comments Sun, 30 Apr 2023 11:28:45 +0000 https://piotrminkowski.com/?p=14121 This article will teach you how to configure concurrency for Kafka consumers with Spring Boot and Spring for Kafka. Concurrency in Spring for Kafka is closely related to the Kafka partitions and consumer groups. Each consumer within a consumer group can receive messages from multiple partitions. While a consumer inside a group uses a single […]

The post Concurrency with Kafka and Spring Boot appeared first on Piotr's TechBlog.

]]>
This article will teach you how to configure concurrency for Kafka consumers with Spring Boot and Spring for Kafka. Concurrency in Spring for Kafka is closely related to the Kafka partitions and consumer groups. Each consumer within a consumer group can receive messages from multiple partitions. While a consumer inside a group uses a single thread, the group of consumers utilizes multiple threads to consume messages. Although each consumer is single-threaded, the processing of records can leverage multiple threads. We will analyze how to achieve it with Spring Boot and Spring for Kafka.

The topic described today, concurrency with Kafka and Spring Boot, rather deals with the basic issues. If you are looking for something more advanced in this area, you can read some of my other articles. In that article, you can find information about Kafka Streams and the Spring Cloud Stream project. You can also read more about Kafka transactions with Spring Boot in the following article.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should go to the no-transactions-service directory. After that, you should just follow my instructions. Let’s begin.

Prerequisites

We will use three different tools in the exercise today. Of course, we will create the Spring Boot consumer app using the latest version of Spring Boot 3 and Java 19. In order to run Kafka locally, we will use Redpanda – a platform compatible with Kafka API. You can easily start and manage Redpanda with their CLI tool – rpk. If you want to install rpk on your laptop please follow the installation instructions available here.

Finally, we need a tool for load testing. I’m using the k6 tool and its extension for integration with Kafka. Of course, it is just a proposition, you can use any other solution you like. With k6 I’m able to generate and send a lot of messages to Kafka quickly. In order to use k6 you need to install it on your laptop. Here are the installation instructions. After that, you need to install the xk6-kafka extension. In the following documentation, you have a full list of the k6 extensions.

Introduction

For the purpose of this exercise, we will create a simple Spring Boot application that connects to Kafka and receives messages from a single topic. From the business logic perspective, it handles transactions between the accounts and stores inside an in-memory database. Here’s a list of dependencies we need to include in the Maven pom.xml:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>

Then, let’s see the configuration settings. Our app connects to the Kafka broker using the address set in the KAFKA_URL environment variable. It expects messages in JSON format. Therefore we need to set JsonDeserializer as a value deserializer. The incoming message is serialized to the pl.piomin.services.common.model.Order object. To make it work, we need to set the spring.json.value.default.type and spring.json.trusted.packages properties. The k6 tool won’t set a header with information containing the JSON target type, so we need to disable that feature on Spring for Kafka with the spring.json.use.type.headers property.

spring:
  application.name: no-transactions-service
  kafka:
    bootstrap-servers: ${KAFKA_URL}
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        "[spring.json.value.default.type]": "pl.piomin.services.common.model.Order"
        "[spring.json.trusted.packages]": "pl.piomin.services.common.model"
        "[spring.json.use.type.headers]": false
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

Here’s the class representing incoming messages.

public class Order {

   private Long id;
   private Long sourceAccountId;
   private Long targetAccountId;
   private int amount;
   private String status;

   // GETTERS AND SETTERS...
}

The last thing we need to do is to enable Spring for Kafka and generate some test accounts for making transactions.

@SpringBootApplication
@EnableKafka
public class NoTransactionsService {

   public static void main(String[] args) {
      SpringApplication.run(NoTransactionsService.class, args);
   }

   private static final Logger LOG = LoggerFactory
      .getLogger(NoTransactionsService.class);

   Random r = new Random();

   @Autowired
   AccountRepository repository;

   @PostConstruct
   public void init() {
      for (int i = 0; i < 1000; i++) {
         repository.save(new Account(r.nextInt(1000, 10000)));
      }
   }

}

Running Kafka using Redpanda

Once we successfully installed the rpk CLI we can easily run a single-node Kafka broker by executing the following command:

$ rpk container start

Here’s the result. As you see the address of my broker is localhost:51961. The port number is generated automatically, so yours will probably be different. To simplify the next actions, let’s just set it as the REDPANDA_BROKERS environment variable.

Once we created a broker we can create a topic. We will use the topic with the transactions name in our tests. In the first step, we make tests with a single partition.

$ rpk topic create transactions -p 1

Prepare Load Tests for Kafka

Our load test will generate and send orders in JSON format with random values. The k6 tool allows us to write tests in JavaScript. We need to use the k6 Kafka extension library. The address of the Kafka broker is retrieved from the KAFKA_URL environment variable. We are incrementing the order’s id field each time we generate a new message.

import {
  Writer,
  SchemaRegistry,
  SCHEMA_TYPE_JSON,
} from "k6/x/kafka";
import { randomIntBetween } from 'https://jslib.k6.io/k6-utils/1.2.0/index.js';

const writer = new Writer({
  brokers: [`${__ENV.KAFKA_URL}`],
  topic: "transactions",
});

const schemaRegistry = new SchemaRegistry();

export function setup() {
  return { index: 1 };
}

export default function (data) {
  writer.produce({
    messages: [
      {
        value: schemaRegistry.serialize({
          data: {
            id: data.index++,
            sourceAccountId: randomIntBetween(1, 1000),
            targetAccountId: randomIntBetween(1, 1000),
            amount: randomIntBetween(10, 50),
            status: "NEW"
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
      },
    ],
  });
}

export function teardown(data) {
  writer.close();
}

Before running the test we need to set the KAFKA_URL environment variable. Then we can use the k6 run command to generate and send a lot of messages.

$ k6 run load-test.js -u 1 -d 30s 

Scenario 1: Single-partition Topic Listener

Let’s start with the defaults. Our topic has just a single partition. We are creating the @KafkaListener just with the topic and consumer group names. Once the listener receives an incoming message it invokes the AccountService bean to process the order.

@Inject
AccountService service;
    
@KafkaListener(
   id = "transactions",
   topics = "transactions",
   groupId = "a")
public void listen(Order order) {
   LOG.info("Received: {}", order);
   service.process(order);
}

Our Spring Boot Kafka app is prepared for concurrency. We will lock the Account entity during the transaction with the PESSIMISTIC_WRITE mode.

public interface AccountRepository extends CrudRepository<Account, Long> {

    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Optional<Account> findById(Long id);
}

Here’s the implementation of our AccountService bean for handling incoming orders. The process(...) method is @Transactional. In the first step, we find the source (1) and target (2) Account entity. Then we perform a transfer between the account if there are sufficient funds on the source account (3). I’m also simulating a delay just for test purposes (4). Finally, we can send a response asynchronously to another topic using the KafkaTemplate bean (5).

@Service
public class AccountService {

    private static final Logger LOG = LoggerFactory
            .getLogger(AccountService.class);
    private final Random RAND = new Random();

    KafkaTemplate<Long, Order> kafkaTemplate;
    AccountRepository repository;

    public AccountService(KafkaTemplate<Long, Order> kafkaTemplate, 
                          AccountRepository repository) {
        this.kafkaTemplate = kafkaTemplate;
        this.repository = repository;
    }

    @Transactional
    public void process(Order order) {
        Account accountSource = repository
                .findById(order.getSourceAccountId())
                .orElseThrow(); // (1)

        Account accountTarget = repository
                .findById(order.getTargetAccountId())
                .orElseThrow(); // (2)

        if (accountSource.getBalance() >= order.getAmount()) { // (3)
            accountSource.setBalance(accountSource.getBalance() - order.getAmount());
            repository.save(accountSource);
            accountTarget.setBalance(accountTarget.getBalance() + order.getAmount());
            repository.save(accountTarget);
            order.setStatus("PROCESSED");
        } else {
            order.setStatus("FAILED");
        }

        try {
            Thread.sleep(RAND.nextLong(1, 20)); // (4)
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        LOG.info("Processed: order->{}", new OrderDTO(order, accountSource, accountTarget));

        // (5)
        CompletableFuture<SendResult<Long, Order>> result = kafkaTemplate
           .send("orders", order.getId(), order);
        result.whenComplete((sr, ex) ->
                LOG.debug("Sent(key={},partition={}): {}",
                        sr.getProducerRecord().partition(),
                        sr.getProducerRecord().key(),
                        sr.getProducerRecord().value()));
    }

}

Let’s set the address of the Kafka broker in the KAFKA_URL environment variable and then start the app.

$ export KAFKA_URL=127.0.0.1:51961
$ mvn spring-boot:run

Let’s analyze what happens. Our listener is connecting to the transactions topic. It establishes just a single connection since there is a single partition.

In that case, we have just a single instance of our app running and a single thread responsible for handling messages. Let’s verify the current lag on the partition for our consumer group. As you see, the messages are processed very slowly. At first glance, you may be quite surprised.

Scenario 2: Multiple-partitions Topic Listener

Let’s analyze the next scenario. Now, the transactions topic consists of 10 partitions. We won’t change anything in the app code and configuration. We will just remove the previously created topic and create a new one with 10 partitions using the following commands:

$ rpk topic delete transactions
$ rpk topic create transaction -p 10

Once again, we are starting the app using the following Maven command:

$ mvn spring-boot:run

Let’s analyze the app logs. As you see, although we have 10 partitions there is still a single thread listening on them.

spring-boot-kafka-concurrency-single-partition

So, our situation hasn’t changed anymore. The app performance is exactly the same. However, now we can run another instance of our Spring Boot app. Once you do it you can take a look at the app logs. A new instance of the app takes 5 partitions.

In that case, a rebalancing occurs. The first instance of our Spring Boot holds 5 other partitions. Now the overall performance is twice as good as before. 

Of course, we can run more app instances. In that case, we can scale up to 10 instances since there are 10 partitions on the topic.

Scenario 3: Consumer Concurrency with Multiple Partitions

Let’s analyze another scenario. Now, we are enabling concurrency at the Kafka listener level. In order to achieve it, we need to set the concurrency field inside the @KafkaListener annotation. This parameter is still related to Kafka partitions. So, there is no sense to set the value higher than the number of partitions. In our case, there are 10 partitions – the same as in the previous scenario. 

@KafkaListener(
   id = "transactions",
   topics = "transactions",
   groupId = "a",
   concurrency = "10")
public void listen(Order order) {
   LOG.info("Received: {}", order);
   service.process(order);
}

After that, we can start the Spring Boot app. Let’s see what happens. As you see, we have 10 concurrent connections – each bound to a single thread.

spring-boot-kafka-concurrency-multi

In that case, the app performance for a single instance is around 10 times better than before. There are 10 concurrent threads, which process incoming messages.

spring-boot-kafka-concurrency-processing

However, if we run our load tests the lag on partitions is still large. Here’s the result after sending ~25k messages in 10 seconds.

spring-boot-kafka-concurrency-lag

Theoretically, we can scale up the number of instances to improve the overall performance. However, that approach won’t change anything. Why? Let’s run another one and take a look at the logs. Now, only 5 threads are still bound to the partitions. Five other threads are in the idle state. The overall performance of the system is not changed. 

Scenario 4: Process in Multiple Threads

Finally the last scenario. We will create a thread pool with the Java ExecutorService. We may still use the custom thread pool with the Kafka consumer concurrency feature as shown below (through the concurrency parameter). Each time the listener receives new messages it processes them in a separate thread.

@Service
public class NoTransactionsListener {

    private static final Logger LOG = LoggerFactory
            .getLogger(NoTransactionsListener.class);

    AccountService service;
    ExecutorService executorService = Executors.newFixedThreadPool(30);

    public NoTransactionsListener(AccountService service) {
        this.service = service;
    }

    @KafkaListener(
            id = "transactions",
            topics = "transactions",
            groupId = "a",
            concurrency = "3")
    public void listen(Order order) {
        LOG.info("Received: {}", order);
        executorService.submit(() -> service.process(order));
    }

}

In that case, one thing should be clarified. With the custom thread pool at the app level, we are losing message ordering within the single partition. The previous model guaranteed ordering, since we have a thread per partition. For our Spring Boot app, it is not important, because we are just processing messages independently.

Let’s start the app. There are 3 concurrent threads that receive messages from the partitions.

There are 30 threads for processing messages and 3 threads for listening to the partitions. Once the message is received in the consumer thread, it is handled by the worker threads.

spring-boot-kafka-concurrency-multi-threads

We can run other instances of our Spring Boot Kafka concurrency apps. I’ll run another two. The first instance grabs 4 partitions, while the next two instances 3.

Now, we can run again load test. It generated and sent ~85k messages to our Kafka broker (around 2.7k per second).

spring-boot-kafka-concurrency-k6

Let’s verify the lag within the consumer group using the rpk group command. The lag on partitions is not large. In fact, there are 90 threads within the three app instances that simultaneously are processing the incoming messages. But wait… does it mean that with 90 threads we are able to process 2.7k orders per second? We should also remember about a custom delay between 1 and 20 ms we added before (with the Thread.sleep method).

The lag looks really fine, although the app is not able to process all requests without a delay. That’s because the default ack mode for commit offset in Spring Kafka is BATCH. If we change it to the RECORD mode, which commits the offset when the listener returns after processing the record, we should get a more precise lag value. In order to override that option, we need to define the ConcurrentKafkaListenerContainerFactory bean as shown below.

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
    kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
   ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
   factory.setConsumerFactory(consumerFactory);
   factory.setConcurrency(3);
   factory.getContainerProperties()
      .setAckMode(ContainerProperties.AckMode.RECORD);
   return factory;
}

Let’s restart the app and make a load test once again. Now, the lag value is much closer to the reality.

Final Thoughts

Concurrency and performance are one of the most important things to consider when working with Kafka and Spring Boot. In this article, I wanted to explain to you some basics with simple examples. I hope it clarifies some concerns over Spring Kafka project usage.

The post Concurrency with Kafka and Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/04/30/concurrency-with-kafka-and-spring-boot/feed/ 15 14121
Serverless on OpenShift with Knative, Quarkus and Kafka https://piotrminkowski.com/2023/04/18/serverless-on-openshift-with-knative-quarkus-and-kafka/ https://piotrminkowski.com/2023/04/18/serverless-on-openshift-with-knative-quarkus-and-kafka/#comments Tue, 18 Apr 2023 09:02:49 +0000 https://piotrminkowski.com/?p=14100 In this article, you will learn how to build and run Quarkus serverless apps on OpenShift and integrate them through Knative Eventing. We will use Kafka to exchange messages between the apps. However, Knative supports various event sources. Kafka is just one of the available options. You can check out a full list of supported […]

The post Serverless on OpenShift with Knative, Quarkus and Kafka appeared first on Piotr's TechBlog.

]]>

In this article, you will learn how to build and run Quarkus serverless apps on OpenShift and integrate them through Knative Eventing. We will use Kafka to exchange messages between the apps. However, Knative supports various event sources. Kafka is just one of the available options. You can check out a full list of supported solutions in the Knative Eventing docs.

I have already published several articles about Knative on my blog. If you want a brief start read my article about Knative basics and Spring Boot. There is also a similar article to the current one more focused on Kubernetes. Today, we will focus more on the OpenShift support for the serverless features. Also, Knative is changing dynamically, so there are some significant differences in comparison to the version described in my previous articles.

Although I’m running my example apps on OpenShift, I’ll give you a recipe for how to do the same thing on vanilla Kubernetes. In order to run them on Kubernetes, you need to activate the kubernetes Maven profile instead of openshift during the build.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

How it works

During the exercise, we will deploy three Quarkus apps on OpenShift: order-service, stock-service and payment-service. All these apps are exposing a single HTTP POST endpoint for incoming events. They are also using Quarkus REST client for sending events to Kafka through the Knative Eventing. The order-service app is sending a single event that both should receive stock-service and payment-service. Then they are processing the event and send a response back to the order-service. All those things happen asynchronously by leveraging Kafka and Knative Broker. However, that process is completely transparent for the apps, which just expose the HTTP endpoint and use the HTTP client to call the endpoint exposed by the KafkaSink object.

The diagram is visible below illustrates the architecture of our solution. There are several Knative objects: KafkaSink, KafkaSource, Trigger, and Broker. The KafkaSink object eliminates the need to use Kafka client on the app side. It receives HTTP requests in CloudEvent format and converts them to the Kafka message sent to the particular topic. The KafkaSource object receives messages from Kafka and sends them to the Knative Broker. Finally, we need to define Trigger. The Trigger object filters the events inside Broker and sends them to the target app by calling its HTTP endpoint.

openshift-serverless-arch

Prerequisites

Before we proceed to the Quarkus apps, we need to install and configure two operators on OpenShift: AMQ Streams (Kafka Strimzi) and OpenShift Serverless (Knative).

openshift-serverless-operators

As a configuration phase, I define the creation of four components: Kafka, KnativeServing, KnativeEventing and KnativeKafka. We can easily create them using OpenShift Console. In all cases, we can leave the default settings. I create the Kafka instance in the kafka namespace. Just in case, here’s the YAML manifest for creating a 3-node Kafka cluster:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
spec:
  kafka:
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: '3.3'
    storage:
      type: ephemeral
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    version: 3.3.1
    replicas: 3
  zookeeper:
    storage:
      type: ephemeral
    replicas: 3

The KnativeServing should be created in the knative-serving namespace, while KnativeEventing in the knative-eventing namespace.

kind: KnativeServing
apiVersion: operator.knative.dev/v1beta1
metadata:
  name: knative-serving
  namespace: knative-serving
spec: {}
---
kind: KnativeEventing
apiVersion: operator.knative.dev/v1beta1
metadata:
  name: knative-eventing
  namespace: knative-eventing
spec: {}

Or just “click” the create button in OpenShift Console.

Finally, the last required component – KnativeKafka. We should at least enable the sink and source to install KafkaSink and KafkaSource CRDs and controllers.

apiVersion: operator.serverless.openshift.io/v1alpha1
kind: KnativeKafka
metadata:
  name: knative-kafka
  namespace: knative-eventing
spec:
  logging:
    level: INFO
  sink:
    enabled: true
  source:
    enabled: true

Functions Support in Quarkus

Although we will implement event-driven architecture today, our Quarkus apps are just exposing and calling HTTP endpoints. In order to expose the method as the HTTP endpoint we need to include the Quarkus Funqy HTTP module. On the other hand, to call the HTTP endpoint exposed by another component we can leverage Quarkus declarative REST client. Our app is storing data in the in-memory H2 database and uses Panache ORM.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-funqy-http</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-rest-client-jackson</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-jdbc-h2</artifactId>
</dependency>

In order to expose the method as the HTTP endpoint we just need to annotate it with @Funq. Here’s the function implementation from the payment-service. It receives two types of orders: reservation and confirmation. For the reservation order reservation (status=NEW) it reserves funds in the customer account. For the confirmation type, it accepts or rollbacks the transaction depending on the order’s status. By default, the method annotated with the @Funq annotation is exposed under the same path as its name – in our case the address of the endpoint is POST /reserve.

public class OrderReserveFunction {

   private static final String SOURCE = "payment";
    
   Logger log;
   OrderReserveService orderReserveService;
   OrderConfirmService orderConfirmService;

   public OrderReserveFunction(Logger log,
                               OrderReserveService orderReserveService,
                               OrderConfirmService orderConfirmService) {
      this.log = log;
      this.orderReserveService = orderReserveService;
      this.orderConfirmService = orderConfirmService;
   }

   @Funq
   public Customer reserve(Order order) {
      log.infof("Received order: %s", order);
      if (order.getStatus() == OrderStatus.NEW) {
         return orderReserveService.doReserve(order);
      } else {
         return orderConfirmService.doConfirm(order);
      }
   }

}

Let’s take a look at the payment reservation implementation. We assume multiple incoming requests to the same concurrently, so we need to lock the entity during the transaction. Once the reservation is performed, we need to send a response back to the order-service. We are leveraging Quarkus REST client for that.

@ApplicationScoped
public class OrderReserveService {

    private static final String SOURCE = "payment";

    Logger log;
    CustomerRepository repository;
    OrderSender sender;

    public OrderReserveService(Logger log,
                               CustomerRepository repository,
                               @RestClient OrderSender sender) {
        this.log = log;
        this.repository = repository;
        this.sender = sender;
    }

    @Transactional
    public Customer doReserve(Order order) {
        Customer customer = repository.findById(order.getCustomerId(), LockModeType.PESSIMISTIC_WRITE);
        if (customer == null)
            throw new NotFoundException();
        log.infof("Customer: %s", customer);
        if (order.getAmount() < customer.getAmountAvailable()) {
            order.setStatus(OrderStatus.IN_PROGRESS);
            customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
            customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
        } else {
            order.setStatus(OrderStatus.REJECTED);
        }
        order.setSource(SOURCE);
        repository.persist(customer);
        log.infof("Order reserved: %s", order);
        sender.send(order);
        return customer;
    }
}

Here’s the implementation of our REST client. It sends a message to the endpoint exposed by the KafkaSink object. The path of the endpoint corresponds to the name of the KafkaSink object. We also need to set HTTP headers to meet the CloudEvent format. Therefore we are registering the custom ClientHeadersFactory implementation.

@ApplicationScoped
@RegisterRestClient
@RegisterClientHeaders(CloudEventHeadersFactory.class)
public interface OrderSender {

    @POST
    @Path("/payment-sink")
    void send(Order order);

}

Our custom ClientHeadersFactory implementation sets some Ce-* (CloudEvent) headers. The most important header is Ce-Type and Ce-Source since we will do filtering based on that values then.

@ApplicationScoped
public class CloudEventHeadersFactory implements ClientHeadersFactory {

    AtomicLong id = new AtomicLong();

    @Override
    public MultivaluedMap<String, String> update(MultivaluedMap<String, String> incoming,
                                                 MultivaluedMap<String, String> outgoing) {
        MultivaluedMap<String, String> result = new MultivaluedHashMap<>();
        result.add("Ce-Id", String.valueOf(id.incrementAndGet()));
        result.add("Ce-Specversion", "1.0");
        result.add("Ce-Type", "reserve-event");
        result.add("Ce-Source", "stock");
        return result;
    }

}

Finally, let’s take a look at the payment confirmation service:

@ApplicationScoped
public class OrderConfirmService {

    private static final String SOURCE = "payment";

    Logger log;
    CustomerRepository repository;

    public OrderConfirmService(Logger log, 
                               CustomerRepository repository) {
        this.log = log;
        this.repository = repository;
    }

    @Transactional
    public Customer doConfirm(Order order) {
        Customer customer = repository.findById(order.getCustomerId());
        if (customer == null)
            throw new NotFoundException();
        log.infof("Customer: %s", customer);
        if (order.getStatus() == OrderStatus.CONFIRMED) {
            customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
            repository.persist(customer);
        } else if (order.getStatus() == OrderStatus.ROLLBACK && !order.getRejectedService().equals(SOURCE)) {
            customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
            customer.setAmountAvailable(customer.getAmountAvailable() + order.getAmount());
            repository.persist(customer);
        }
        return customer;
    }
    
}

Also, let’s take a look at the implementation of the function in the order-service.

public class OrderConfirmFunction {

    private final Logger log;
    private final OrderService orderService;

    public OrderConfirmFunction(Logger log, OrderService orderService) {
        this.log = log;
        this.orderService = orderService;
    }

    @Funq
    public void confirm(Order order) {
        log.infof("Accepted order: %s", order);
        orderService.doConfirm(order);
    }

}

Here’s the function implementation for the stock-service:

public class OrderReserveFunction {

    private static final String SOURCE = "stock";

    private final OrderReserveService orderReserveService;
    private final OrderConfirmService orderConfirmService;
    private final Logger log;

    public OrderReserveFunction(OrderReserveService orderReserveService,
                                OrderConfirmService orderConfirmService,
                                Logger log) {
        this.orderReserveService = orderReserveService;
        this.orderConfirmService = orderConfirmService;
        this.log = log;
    }

    @Funq
    public void reserve(Order order) {
        log.infof("Received order: %s", order);
        if (order.getStatus() == OrderStatus.NEW) {
            orderReserveService.doReserve(order);
        } else {
            orderConfirmService.doConfirm(order);
        }
    }

}

Configure Knative Eventing

After we finished the implementation of app logic we can proceed to the configuration of OpenShift Serverless and Knative components. If you are using my GitHub repository you don’t have to manually apply any YAML manifests. All the required configuration is applied during the Maven build. It is possible thanks to the Quarkus Kubernetes extension and its support for Knative. We just need to place all the required YAML manifests inside the src/main/kubernetes/knative.yml and the magic happens by itself.

However, in order to understand what happens let’s discuss step-by-step Knative configuration. In the first step, we need to create the KafkaSink objects. KafkaSink exposes the HTTP endpoint and gets CloudEvent on input. Then it sends that event to the particular topic in the Kafka cluster. Here’s the KafkaSink definition for the payment-service:

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: payment-sink
  namespace: demo-eventing
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topic: reserve-events
  numPartitions: 1

Both payment-service and stock-service send messages on the same reserve-events topic. Therefore, we can also create a single KafkaSink per those two services (I created two sinks, each of them dedicated to the single app). On the other hand, the order-service app sends messages to the order-events topic, so we have to create a separate KafkaSink:

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: order-sink
  namespace: demo-eventing
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topic: order-events
  numPartitions: 1

After that, let’s print the list of sinks in our cluster:

Now, this URL address should be set in the application properties for the REST client configuration. Here’s the fragment of the Quarkus application.properties:

%openshift.quarkus.rest-client."pl.piomin.samples.quarkus.serverless.order.client.OrderSender".url = http://kafka-sink-ingress.knative-eventing.svc.cluster.local/demo-eventing

With KafkaSink we are able to send messages to the Kafka cluster. In order to receive them on the target apps side we need to create other objects. In the first step, we will create Knative Broker and KafkaSource object. The broker may be easily created using kn CLI:

$ kn broker create default

The KafkaSource object connects to the Kafka cluster and receives messages from the defined list of topics. In our case, these are order-events and reserve-events. The output of the KafkaSource object is the already-created default broker. It means that all the messages exchanged between our three apps are delivered to the Knative Broker.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-to-broker
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - order-events
    - reserve-events
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default

In the final step, we need to configure the mechanism responsible for getting messages from Knative Broker and sending them to the target services. In order to do that, we have to create Trigger objects. A trigger can filter messages by CloudEvent attributes. Cloud event attributes are related to the Ce-* HTTP headers from the request. For example, the payment-service app receives only messages sent by the order-service and containing the order-event type.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: payment-trigger
spec:
  broker: default
  filter:
    attributes:
      source: order
      type: order-event
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: payment-service
    uri: /reserve

The stock-trigger object is very similar. It connects to the default Broker and gets only messages with the source=order and type=order-event. Finally, it calls the POST /reserve endpoint exposed by the stock-service.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: stock-trigger
spec:
  broker: default
  filter:
    attributes:
      source: order
      type: order-event
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: stock-service
    uri: /reserve

On the other hand, the order-service app should receive events from both stock-service and payment-service. Therefore, we are filtering messages just by the type attribute. The target endpoint of the order-service is POST /confirm.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: order-trigger
spec:
  broker: default
  filter:
    attributes:
      type: reserve-event
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: order-service
    uri: /confirm

Run Quarkus Apps on Knative

We can leverage the Quarkus Kubernetes/OpenShift extension to run the app as a Knative service. In order to do that we need to include the quarkus-openshift dependency in Maven pom.xml. We would like to use that module during the build only if we need to deploy the app on the cluster. Therefore, we will create a custom Maven profile openshift. Besides including the quarkus-openshift dependency it also enables deployment by setting the quarkus.kubernetes.deploy property to true and activates the custom Quarkus profile openshift.

<profiles>
  <profile>
    <id>openshift</id>
    <activation>
      <property>
        <name>openshift</name>
      </property>
    </activation>
    <dependencies>
      <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-openshift</artifactId>
      </dependency>
    </dependencies>
    <properties>
      <quarkus.kubernetes.deploy>true</quarkus.kubernetes.deploy>
      <quarkus.profile>openshift</quarkus.profile>
    </properties>
  </profile>
</profiles>

Once we include the quarkus-openshift module, we may use Quarkus configuration properties to customize the deployment process. Firstly, we need to set the quarkus.kubernetes.deployment-target property to knative. Thanks to that Quarkus will automatically generate the YAML manifest with Knative Service instead of a standard Kubernetes Deployment. We can also override default autoscaling settings with the quarkus.knative.revision-auto-scaling.* properties. The whole build process is running on the cluster with S2I (source-2-image), so we can use the internal OpenShift registry (the quarkus.container-image.registry property). Here’s the fragment of the application.properties file for the order-service.

quarkus.kubernetes-client.trust-certs = true
quarkus.kubernetes.deployment-target = knative
quarkus.knative.env.vars.tick-timeout = 10000
quarkus.knative.revision-auto-scaling.metric = rps
quarkus.knative.revision-auto-scaling.target = 50

%openshift.quarkus.container-image.group = demo-eventing
%openshift.quarkus.container-image.registry = image-registry.openshift-image-registry.svc:5000
%openshift.app.orders.timeout = ${TICK_TIMEOUT}

Finally, we just need to activate the openshift profile during the build and all the apps will be deployed to the target OpenShift cluster. You can deploy a single app or all the apps by running the following command in the repository root directory:

$ mvn clean package -Popenshift

Once we deploy our apps we display a list of Knative services.

openshift-serverless-knative-svc

We can also verify if all the triggers have been configured properly.

Also, let’s take a look at the “Topology” view on OpenShift which illustrates our serverless architecture.

openshift-serverless-topology

Testing Services

It is also worth creating some automated tests to verify the basic functionality before deployment. Since we have simple HTTP apps and an in-memory H2 database we can create standard tests. The only thing we need to do is to mock the HTTP client.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-junit5</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>io.rest-assured</groupId>
  <artifactId>rest-assured</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-junit5-mockito</artifactId>
  <scope>test</scope>
</dependency>

Here’s the JUnit test for the payment-service. We are verifying both the reservation and confirmation processes for the same order.

@QuarkusTest
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class OrderReserveFunctionTests {

    private static int amount;
    private CustomerRepository repository;
    @InjectMock
    @RestClient
    OrderSender sender;

    public OrderReserveFunctionTests(CustomerRepository repository) {
        this.repository = repository;
    }

    @Test
    @org.junit.jupiter.api.Order(1)
    void reserve() {
        given().contentType("application/json").body(createTestOrder(OrderStatus.NEW)).post("/reserve")
                .then()
                .statusCode(204);

        Customer c = repository.findById(1L);
        amount = c.getAmountAvailable();
        assertEquals(100, c.getAmountReserved());
    }

    @Test
    @org.junit.jupiter.api.Order(2)
    void confirm() {
        given().contentType("application/json").body(createTestOrder(OrderStatus.CONFIRMED)).post("/reserve")
                .then()
                .statusCode(204);

        Customer c = repository.findById(1L);
        assertEquals(0, c.getAmountReserved());
        assertEquals(amount, c.getAmountAvailable());
    }

    private Order createTestOrder(OrderStatus status) {
        Order o = new Order();
        o.setId(1L);
        o.setSource("test");
        o.setStatus(status);
        o.setAmount(100);
        o.setCustomerId(1L);
        return o;
    }
}

Also, let’s take a look at the logs of apps running on OpenShift. As you see, the order-service receives events from both stock-service and payment-service. After that, it confirms the order and sends a confirmation message to both services.

Here are the logs from the payment-service. As you see, it receives the CloudEvent generated by the order-service (the Ce-Source header equals to order).

Final Thoughts

With OpenShift Serverless and Knative Eventing, you can easily build event-driven architecture for simple HTTP-based apps. It is completely transparent for the app, which medium is used to store events and how they are routed. The only thing it needs to do is to prepare a request according to the CloudEvent specification. OpenShift Serverless brings several features to simplify development. We can also leverage Quarkus Kubernetes Extension to easily build and deploy our apps on OpenShift as Knative services.

The post Serverless on OpenShift with Knative, Quarkus and Kafka appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/04/18/serverless-on-openshift-with-knative-quarkus-and-kafka/feed/ 2 14100
Kafka Transactions with Spring Boot https://piotrminkowski.com/2022/10/29/kafka-transactions-with-spring-boot/ https://piotrminkowski.com/2022/10/29/kafka-transactions-with-spring-boot/#comments Sat, 29 Oct 2022 08:23:21 +0000 https://piotrminkowski.com/?p=13623 In this article, you will learn how to use Kafka transactions with the Spring Kafka project in your Spring Boot app. In order to run the Kafka cluster we will use Upstash. This article provides a basic introduction to Kafka transactions. If you are looking for more advanced usage and scenarios you may refer to […]

The post Kafka Transactions with Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka transactions with the Spring Kafka project in your Spring Boot app. In order to run the Kafka cluster we will use Upstash. This article provides a basic introduction to Kafka transactions. If you are looking for more advanced usage and scenarios you may refer to that article, about distributed transactions in microservices. You can also read more about Kafka Streams the Spring Cloud Stream project in this article.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Getting Started with Kafka in Spring Boot

I have already created a Kafka cluster on Upstash using a web dashboard. All the connection credentials are generated automatically. You can find and copy them on the main page of your cluster.

Assuming we have a username as the KAFKA_USER variable and a password as the KAFKA_PASS variable we need to provide the following Spring configuration in the application.yml file:

spring:
  application.name: transactions-service
  kafka:
    bootstrap-servers: inviting-camel-5620-eu1-kafka.upstash.io:9092
    properties:
      security.protocol: SASL_SSL
      sasl.mechanism: SCRAM-SHA-256
      sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${KAFKA_USER}" password="${KAFKA_PASS}";

Here’s a list of required dependencies. Since we exchange JSON messages, we need the Jackson library for serialization or deserialization. Of course, we also need to include Spring Boot starter and Spring Kafka.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
</dependency>

The transactions-service is generating and sending orders. We will create the test topic transactions on the app startup.

@SpringBootApplication
public class TransactionsService {

   public static void main(String[] args) {
      SpringApplication.run(TransactionsService.class, args);
   }

   @Bean
   public NewTopic transactionsTopic() {
      return TopicBuilder.name("transactions")
          .partitions(3)
          .replicas(1)
          .build();
   }

}

Enabling Kafka Transactions in Spring Boot

In Kafka, a producer initiates a transaction by making a request to the transaction coordinator. You can find a detailed description of that process in the following article on the Confluent blog.

With Spring Boot, we just need to set the spring.kafka.producer.transaction-id-prefix property to enable transactions. Spring Boot will do the rest by automatically configuring a KafkaTransactionManager bean and wiring it into the listener container. Here’s a part of the configuration responsible for the message producer. We use JsonSerializer to serialize data from objects into JSON. Transactions prefix is tx-.

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      transaction-id-prefix: tx-

During our scenario, we will send 10 messages within a single transaction. In order to observe logs on the consumer side we set a delay between subsequent attempts to 1 second.

@Transactional
public void generateAndSendPackage() 
      throws InterruptedException, TransactionException {
   for (long i = 0; i < 10; i++) {
      Order t = new Order(id++, i+1, i+2, 1000, "NEW");
      ListenableFuture<SendResult<Long, Order>> result =
         kafkaTemplate.send("transactions", t.getId(), t);
      result.addCallback(callback);
      Thread.sleep(1000);
   }
}

Enable Transactions on the Kafka Consumer Side

In the first step, we will just print the incoming messages. We need to annotate the listener method with the @KafkaListener. The target topic is transactions, and the consumer group is a. Also, we have to add the @Transactional annotation to enable transaction support for the listen method.

@Service
public class TransactionsListener {

   private static final Logger LOG = LoggerFactory
          .getLogger(TransactionsListener.class);

   @KafkaListener(
          id = "transactions",
          topics = "transactions",
          containerGroup = "a",
          concurrency = "3")
   @Transactional
   public void listen(Order order) {
      LOG.info("{}", order);
   }
}

Let’s run the producer app first. To do go to the transactions-service directory and execute the command mvn spring-boot:run. It is a good idea to enable more detailed logs for Spring Kafka transactions. To do that add the following line to the application.yml file:

logging:
  level:
    org.springframework.transaction: trace
    org.springframework.kafka.transaction: debug

After that, let’s run the consumer app. In order to that go to the accounts-service directory and run the same command as before. You should see the following topic created in the Upstash console:

kafka-transactions-spring-boot-upstash

The transactions-service app exposes the REST endpoint for sending messages. It just starts that procedure of generating and sending 10 messages within a single transaction I mentioned in the previous section. Let’s call the endpoint:

$ curl -X POST http://localhost:8080/transactions

Let’s see at the logs on the producer side. After sending all the messages it committed the transaction.

kafka-transactions-spring-boot-logs

Now, let’s see how it looks on the consumer side. All the messages are received just after being sent by the producer app. It is not something that we expected…

In order to verify what happened, we need to take a look at the consumer app logs. Here’s a fragment with Kafka consumer settings. As you see, by default Spring Kafka sets the transactions isolation level to read_uncommitted for Spring Boot.

Deep Dive into Transactions with Spring Kafka

In order to solve the problem with transactions from the previous section, we need to change the default isolation level in the application.yml file. As the spring.kafka.consumer.properties we have to set the isolation.level property to read_commited as shown below.

spring:
  application.name: accounts-service
  kafka:
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"
        isolation.level: read_committed

After that let’s run the accounts-service app once again.

Now, all the messages have been received after the producer committed the transaction. There are three consumer threads as we set the @KafkaListener concurrency parameter to 3.

kafka-transactions-spring-boot-producer

In the next step, we will test the rollback of transactions on the producer side. In order to do that, we will modify the method for generating and sending orders. Now, the generateAndSendPackage is getting a boolean parameter, that indicates if a transaction should be rollbacked or not.

@Transactional
public void generateAndSendPackage(boolean error)
       throws InterruptedException {
   for (long i = 0; i < 10; i++) {
      Order t = new Order(id++, i+1, i+2, 1000, "NEW");
      ListenableFuture<SendResult<Long, Order>> result =
            kafkaTemplate.send("transactions", t.getId(), t);
      result.addCallback(callback);
      if (error && i > 5)
         throw new RuntimeException();
      Thread.sleep(1000);
   }
}

Here are the logs from our test. After sending six orders the method throws a RuntimeException and Spring rollbacks a transaction. As expected, the consumer app does not receive any messages.

It is important to know that Spring rollbacks are only on unchecked exceptions by default. To rollback checked exceptions, we need to specify the rollbackFor on the @Transactional annotation.

The transactional producer sends messages to the Kafka cluster even before committing the transaction. You could see it in the previous section, where the listener was continuously receiving messages if the isolation level was read_uncommited. Consequently, if we roll back a transaction on the producer side the message sent before rollback occurs come to the Kafka broker. We can see it e.g. in the Upstash live message view for the transactions topic.

kafka-transactions-spring-boot-live

Here’s the current value of offsets for all partitions in the transactions topic for the a consumer group. We made a successful commit after sending the first package of 10 messages and we rollbacked the transaction with the second package. The sum of offsets is 10 in that case. But in fact, it is different that the current latest offset on those partitions.

To verify it, we can, for example, change a consumer group name for the listener to b and start another instance of the accounts-service.

@KafkaListener(
     id = "transactions",
     topics = "transactions",
     containerGroup = "b",
     concurrency = "3")
@Transactional
public void listen(Order order) {
   LOG.info("{}", order);
}

Here’s the current value of offsets for the b consumer group.

Of course, the messages have been rollbacked. But the important thing to understand here is that these operations happen on the Kafka broker side. The transaction coordinator changes the values of Kafka offsets. We can easily verify that consumer won’t receive messages after rollback even if we the initial offset to the earliest with the spring.kafka.consumer.auto-offset-reset property.

Add Database

In this section, we will extend our scenario with new functionalities. Our app will store the status of orders in the database. Just for demo purposes, we will use an in-memory database H2. There are two dependencies required in this scenario: H2 and Spring Data JPA.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>

There the OrderGroup entity that stores the current status of the package (SENT, CONFIRMED, ROLLBACK), the total number of orders in the single package, and the total number of processed orders by the accounts-service.

@Entity
public class OrderGroup {

   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long id;
   private String status;
   private int totalNoOfOrders;
   private int processedNoOfOrders;

   // GETTERS/SETTERS ...
}

In order to manage the entity we use the Spring Data repository pattern:

public interface OrderGroupRepository extends 
   CrudRepository<OrderGroup, Long> {
}

We will also include a database in the accounts-service app. When it processes the incoming orders it performs transfers between the source and target account. It will store the account balance in the database.

@Entity
public class Account {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private int balance;

   // GETTERS/SETTERS ...
}

The same as before there is a repository bean for managing the Account entity.

public interface AccountRepository extends
   CrudRepository<Account, Long> {
}

We also need to modify the Order message exchanged between the apps. It has to contain the groupId field for processing confirmations.

public class Order {

    private Long id;
    private Long sourceAccountId;
    private Long targetAccountId;
    private int amount;
    private String status;
    private Long groupId;

   // GETTERS/SETTERS ...
}

Here’s the diagram that illustrates our architecture for the described scenario.

kafka-transactions-spring-boot-arch

Handling Transactions Across Multiple Resources

After including Spring Data JPA there are two registered TransactionManager beans with names transactionManager and kafkaTransactionManager. Therefore we need to choose the name of the transaction manager inside the @Transactional annotation. In the first step, we add a new entity to the database. The primary key id is auto-generated in the database and then returned to the object. After that, we get groupId and generate the sequence of orders within that group. Of course, both operations (save to database, sent to Kafka) are part of the same transaction.

@Transactional("kafkaTransactionManager")
public void sendOrderGroup(boolean error) throws InterruptedException {
   OrderGroup og = repository.save(new OrderGroup("SENT", 10, 0));
   generateAndSendPackage(error, og.getId());
}

private void generateAndSendPackage(boolean error, Long groupId)
      throws InterruptedException {
   for (long i = 0; i < 10; i++) {
      Order o = new Order(id++, i+1, i+2, 1000, "NEW", groupId);
      ListenableFuture<SendResult<Long, Order>> result =
         kafkaTemplate.send("transactions", o.getId(), o);
      result.addCallback(callback);
      if (error && i > 5)
         throw new RuntimeException();
      Thread.sleep(1000);
   }
}

The accounts-service app listens for incoming orders. It is processing every single order in a separate transaction. It checks if sufficient funds are in the customer account to make a transfer. If there is enough money, it performs a transfer. Finally, it sends the response to transactions-service with the transaction status. The message is sent to the orders topic.

@KafkaListener(
   id = "transactions",
   topics = "transactions",
   groupId = "a",
   concurrency = "3")
@Transactional("kafkaTransactionManager")
public void listen(Order order) {
   LOG.info("Received: {}", order);
   process(order);
}

private void process(Order order) {
   Account accountSource = repository
      .findById(order.getSourceAccountId())
      .orElseThrow();
   Account accountTarget = repository
      .findById(order.getTargetAccountId())
      .orElseThrow();
   if (accountSource.getBalance() >= order.getAmount()) {
      accountSource.setBalance(accountSource.getBalance() - order.getAmount());
      repository.save(accountSource);
      accountTarget.setBalance(accountTarget.getBalance() + order.getAmount());
      repository.save(accountTarget);
      order.setStatus("PROCESSED");
   } else {
      order.setStatus("FAILED");
   }
   LOG.info("After processing: {}", order);
   kafkaTemplate.send("orders", order.getId(), order);
}

The transactions-service listens for order confirmations on the orders topic. Once it receives the message it increases the number of processed orders within an order group and stores the current result in the database. We should use a default Spring transaction manager since we don’t send any messages to Kafka.

@KafkaListener(
      id = "orders",
      topics = "orders",
      groupId = "a",
      concurrency = "3")
@Transactional("transactionManager")
public void listen(Order order) {
   LOG.info("{}", order);
   OrderGroup og = repository
      .findById(order.getGroupId())
      .orElseThrow();
   if (order.getStatus().equals("PROCESSED")) {      
      og.setProcessedNoOfOrders(og.getProcessedNoOfOrders() + 1);
      og = repository.save(og);
      LOG.info("Current: {}", og);
   }
}

Don’t forget to lock the OrderGroup record during the transaction. Since we are processing messages concurrently (with 3 threads) we need to lock the OrderGroup record until we update the value of processedNoOfOrders column:

public interface OrderGroupRepository extends
        CrudRepository<OrderGroup, Long> {

    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Optional<OrderGroup> findById(Long groupId);
}

Let’s test a positive scenario. We will generate a group of orders that should be confirmed. To do that let’s call our endpoint POST /transactions:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d 'false'

Here are the logs from the accounts-service app:

We can also take at the logs generated by the transactions-service app:

Finally, we can verify the current status of our order group by calling the following endpoint:

$ curl -X GET 'http://localhost:8080/transactions'

What happens if we roll back the transaction? Try it by yourself with the following command:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d 'true'

Final Thoughts

You can easily handle Kafka transactions with Spring Boot using the Spring Kafka project. You can integrate your app with a database and handle transactions across multiple resources. However, one thing needs to be clarified – Kafka does not support XA transactions. It may result in data inconsistency. Spring does not solve that case, it just performs two transactions in the background. When the @Transactional method exits, Spring Boot will commit the database transactions first and then the Kafka transactions. You can just change that order to enable Kafka transaction commit first by configuring the outer method configured to use the DataSourceTransactionManager, and the inner method to use the KafkaTransactionManager.

Can we solve that case somehow? Of course. There is, for example, project Debezium that allows you to stream database changes into Kafka topics. With that approach, you can just commit changes in the database, and then configure Debezium to send events with changes to Kafka. For more details about that tool and outbox pattern please refer to the article available here.

The post Kafka Transactions with Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/10/29/kafka-transactions-with-spring-boot/feed/ 6 13623
Running Redpanda on Kubernetes https://piotrminkowski.com/2022/09/06/running-redpanda-on-kubernetes/ https://piotrminkowski.com/2022/09/06/running-redpanda-on-kubernetes/#comments Tue, 06 Sep 2022 15:29:14 +0000 https://piotrminkowski.com/?p=13109 In this article, you will learn how to install and manage Redpanda on Kubernetes. It is not the first article related to Redpanda on my blog. You can read more about Redpanda in my earlier post here. I’m describing there how to do a local development of Java apps with Redpanda, Quarkus, and Testcontainers. You […]

The post Running Redpanda on Kubernetes appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to install and manage Redpanda on Kubernetes. It is not the first article related to Redpanda on my blog. You can read more about Redpanda in my earlier post here. I’m describing there how to do a local development of Java apps with Redpanda, Quarkus, and Testcontainers.

You can use Redpanda in local development as an alternative to the standard Apache Kafka. It is a Kafka API-compatible tool but does not use ZooKeeper or JVM.

In this article, I’m going to show that you can also easily run and use Redpanda on Kubernetes. There are some interesting features that will surely interest you. Let’s begin.

Source Code

If you would like to try this exercise yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. Then switch to the redpanda branch. You will find sample applications for sending and receiving messages to Kafka in the event-driven directory. After that, just follow my instructions.

Install Cert Manager on Kubernetes

The recommended way to install Redpanda on Kubernetes is through the operator. The Redpanda operator requires the cert-manager to create certificates for TLS communication. So in the first step, we need to install cert-manager. We will use Helm for that. Let’s add the following Helm repository:

$ helm repo add jetstack https://charts.jetstack.io && \
  helm repo update

After that, we can install cert-manager in the cert-manager namespace. In order to create a namespace automatically, we should enable the create-namespace option. By default, the Cert Manager does not install CRDs on Kubernetes. Let’s enable it using the installCRDs Helm parameter:

$ helm install cert-manager \
   --namespace cert-manager --create-namespace \
   --set installCRDs=true \
   --version v1.9.1 jetstack/cert-manager

Here’s the list of the cert-manager pods. Assuming you have a similar result, you may proceed to the next section.

$ kubectl get pod -n cert-manager                                                                
NAME                                      READY   STATUS    RESTARTS   AGE
cert-manager-877fd747c-lhrgd              1/1     Running   0          1m
cert-manager-cainjector-bbdb88874-tmlg9   1/1     Running   0          1m
cert-manager-webhook-5774d5d8f7-flv7s     1/1     Running   0          1m

Install Redpanda Operator using Helm

Before we install the Redpanda operator we need to apply a single Cluster object CRD:

$ kubectl apply -k 'https://github.com/redpanda-data/redpanda/src/go/k8s/config/crd?ref=v22.2.2'

The same as before, we will use Helm in the installation process. Firstly, let’s add the official Redpanda Helm repository:

$ helm repo add redpanda https://charts.vectorized.io && \
  helm repo update

We will install the latest version of the Redpanda operator (22.2) in the redpanda-system namespace.

$ helm install redpanda-operator redpanda/redpanda-operator \
    --namespace redpanda-system \
    --create-namespace \
    --set monitoring.enabled=true \
    --version v22.2.2

After that, we may use the Cluster CRD object to create a single-node Redpanda cluster on Kubernetes. Also, let’s enable developer mode. Redpanda provides a built-in HTTP proxy and schema registry. The name of our cluster is one-node-cluster.

apiVersion: redpanda.vectorized.io/v1alpha1
kind: Cluster
metadata:
  name: one-node-cluster
spec:
  image: vectorized/redpanda
  version: latest
  replicas: 1
  resources:
    requests:
      cpu: 1
      memory: 2Gi
    limits:
      cpu: 1
      memory: 2Gi
  configuration:
    rpcServer:
      port: 33145
    kafkaApi:
    - port: 9092
    pandaproxyApi:
    - port: 8082
    schemaRegistry:
      port: 8081
    adminApi:
    - port: 9644
    developerMode: true

Redpanda will run in the redpanda namespace. Let’s create that namespace first.

$ kubectl create ns redpanda

Then, let’s create the Cluster object in the redpanda namespace.

$ kubectl apply -f redpanda-cluster.yaml -n redpanda

The Redpanda operator creates two Kubernetes Services for the cluster. The first of them one-node-cluster is a headless service and it is used in internal communication. We could have enabled external communication, but it is not required in our scenario. Applications are using the port 9092, which is compatible with the Kafka API. There is also a dedicated service for exposing the schema registry under the port 8081.

$ kubectl get svc -n redpanda
NAME                       TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                    AGE
one-node-cluster           ClusterIP   None            <none>        9644/TCP,9092/TCP,8082/TCP   3m
one-node-cluster-cluster   ClusterIP   10.98.26.202    <none>        8081/TCP                     3m

Install via Helm without operator

We can also use the Redpanda Helm chart, which does not require the installation of CRDs or an operator, but instead creates a cluster according to the configuration in a values.yaml file. This is the recommended way to install Redpanda on Kubernetes. In order to use it, clone the Redpanda repository with the Helm chart:

$ git clone https://github.com/redpanda-data/helm-charts.git
$ cd helm-charts/redpanda

Then, you need to install Redpanda using the following Helm command. Since we use a single-node Kubernetes cluster we override the default number of brokers using the statefulset.replicas parameter.

$ helm install redpanda . \
    -n redpanda \
    --create-namespace \
    --set statefulset.replicas=1

In comparison to the previous installation method, you would have to install the kube-prometheus-stack separately.

$ helm repo add prometheus-community https://prometheus-community.github.io/helm-charts && \
  helm repo update

Enable Prometheus Metrics

In the previous section, we installed the Prometheus stack using the Redpanda operator. By default, it monitors Kubernetes core components and the Redpanda operator. Our goal is to enable monitoring of the currently created Redpanda cluster in the redpanda namespace. To do that, we need to create the PodMonitor object provided by the Prometheus operator. It requires us to at least set the pod target namespace, label selector, and metrics endpoints. The Redpanda operator exposes metrics on the admin port (9644) under metrics and public_metrics paths.

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  labels:
    app.kubernetes.io/instance: redpanda-cluster
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/name: redpanda-cluster
    app.kubernetes.io/version: v22.2.2
    helm.sh/chart: redpanda-operator-v22.2.2
    release: redpanda-operator
  name: redpanda-cluster-monitor
  namespace: redpanda-system
spec:
  namespaceSelector:
    matchNames:
      - redpanda
  podMetricsEndpoints:
    - path: /metrics
      port: admin
    - path: /public_metrics
      port: admin
  selector:
    matchLabels:
      app.kubernetes.io/name: redpanda

Once you create the PodMonitor, you will be able to query Redpanda metrics. There are a lot of exported metrics. You can verify these metrics in the Prometheus dashboard. Their names are starting with the vectorized_ prefix.

redpanda-kubernetes-prometheus

The simplest way to view important metrics is through the Grafana dashboard. Therefore, we are going to create a dashboard there. Fortunately, Redpanda provides an automatic mechanism for generating a Grafana dashboard. There is a dedicated Redpanda CLI (rpk) command to do it. We just need to set the name of the data source (Prometheus) and metrics endpoint URL (public_metrics).

$ kubectl exec pod/one-node-cluster-0 -n redpanda -c redpanda \
    -- rpk generate grafana-dashboard --datasource Prometheus \
    --metrics-endpoint http://localhost:9644/public_metrics \
    > redpanda-dashboard.json

Once we export the dashboard as a JSON file, we may import it into the Grafana dashboard.

Enable Redpanda Console

Redpanda provides a UI dashboard for managing cluster instances called Redpanda Console. However, it is not installed by the operator. In order to run it on Kubernetes, we will use the Helm chart. Firstly, let’s add the required Helm repository:

$ helm repo add redpanda-console https://packages.vectorized.io/public/console/helm/charts/ && \
  helm repo update

We need to override some configuration settings. By default, the Redpanda Console tries to detect both broker and schema registry on a localhost address. Since we are running Redpanda on Kubernetes we need to set the name of the service one-node-cluster for the broker and one-node-cluster-cluster for the schema registry. Here’s our values.yaml file:

console:
  config:
    kafka:
      brokers:
        - one-node-cluster:9092
      clientId: redpanda-console
    schemaRegistry:
      enabled: true
      urls: ["http://one-node-cluster-cluster:8081"]
      username: console
      password: redacted

Finally, let’s install the console in the same namespace as the broker.

$ helm install redpanda-console redpanda-console/console \
    --values values.yaml \
    -n redpanda

The Redpanda Console is available under the 8080 port. We can enable port-forward to access it locally.

Integrate Spring Boot with Redpanda

Our instance of Redpanda is ready. Let’s run sample applications on Kubernetes. They are sending events to Redpanda and receiving them from Redpanda. Our applications are written in Kotlin and built on top of Spring Boot and use Spring Cloud Stream to integrate with the Kafka-compatible API. They are using the Avro format for serializing and deserializing messages. Thanks to the Spring Cloud Schema Registry client, they may also integrate with the schema registry provided by Redpanda.

Here’s the list of required modules for both producer and consumer apps:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-schema-registry-client</artifactId>
</dependency>

If you would like to read more about Spring Cloud support for Kafka and schema registry you can refer to this article on my blog. It describes how to build event-driven architectures with Spring Cloud Stream Kafka and use Avro format in communication between apps.

Our producer app continuously generates and sends messages to the Redpanda topic. It is integrated with the schema registry available under the address provided in the spring.cloud.schemaRegistryClient.endpoint property. To enable that integration, we need to annotate the main class with the @EnableSchemaRegistryClient.

@SpringBootApplication
@EnableSchemaRegistryClient
class ProductionApplication {

   var id: Int = 0

   @Value("\${callme.supplier.enabled}")
   val supplierEnabled: Boolean = false

   @Bean
   fun callmeEventSupplier(): Supplier<Message<CallmeEvent>?> = Supplier { createEvent() }

   @Primary
   @Bean
   fun schemaRegistryClient(@Value("\${spring.cloud.schemaRegistryClient.endpoint}") endpoint: String?): SchemaRegistryClient {
      val client = ConfluentSchemaRegistryClient()
      client.setEndpoint(endpoint)
      return client
   }

   private fun createEvent(): Message<CallmeEvent>? {
      return if (supplierEnabled)
          MessageBuilder.withPayload(CallmeEvent(++id, "I'm callme event!", "ping"))
               .setHeader("to_process", true)
               .build()
      else
         null
   }
}

Our app does not contain a lot of code, however, we need to provide some configuration settings. In order to enable serialization with Avro, we need to set a default content type to application/*+avro (1). The target topic on Redpanda is callme-events (2). It consists of 2 partitions (3). We also need to set the name of bean responsible for generating messages (4). With the property spring.cloud.schema.avro.dynamicSchemaGenerationEnabled we may enable automatic generation of the Avro schema based on the source code (5). Of course, we also need provide the Redpanda broker address (6) and schema registry address (7).

spring.application.name=producer-service
spring.cloud.stream.default.contentType=application/*+avro # (1)
spring.cloud.stream.bindings.callmeEventSupplier-out-0.contentType=application/*+avro
spring.cloud.stream.bindings.callmeEventSupplier-out-0.destination=callme-events # (2)
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionCount=2 # (3)
spring.cloud.stream.source=callmeEventSupplier # (4)

spring.cloud.schema.avro.dynamicSchemaGenerationEnabled=true # (5)
spring.cloud.schemaRegistryClient.endpoint=http://one-node-cluster-cluster:8081/ # (6)
spring.kafka.bootstrap-servers=one-node-cluster:9092 # (7)
spring.main.allow-bean-definition-overriding=true

callme.supplier.enabled=true

Finally, let’s build and deploy our producer app on Kubernetes. We may use Skaffold for that. The app source code is configured to support it. Here’s our Deployment manifest:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
      - name: producer
        image: piomin/producer-service
        ports:
        - containerPort: 8080

Let’s verify a list of running pods in the redpanda namespace:

$ kubectl get pod -n redpanda
NAME                               READY   STATUS    RESTARTS   AGE
one-node-cluster-0                 1/1     Running   0          112m
producer-5b7f5cfcc6-586z2          1/1     Running   0          65m
redpanda-console-dcf446dc8-fzc2t   1/1     Running   0          104m

Monitor Redpanda on Kubernetes with Console and Prometheus

Our producer app is running on Kubernetes. It generates and sends messages. Let’s switch to the Redpanda Console. Here’s the view with a list of topics. As you see, the topic callme-events has been created:

redpanda-kubernetes-console

If you click on the topic, you will see the details and a list of messages:

Also, let’s verify the message schema available under the Schema Registry menu. You can compare it with the CallmeEvent object in the source code.

redpanda-kubernetes-schema

Then, let’s run our consumer app. It also integrates with the schema registry and receives messages form callme-events topic.

Thanks to Prometheus and Grafana, we can monitor several parameters related to the Redpanda broker. Here’s the screen from the Grafana dashboard:

Final Thoughts

Redpanda simplifies deployment on Kubernetes in comparison to the standard Kafka. Within the single pod, we have a broker, a schema registry, and an HTTP proxy. We can also easily install a UI console to manage Redpanda graphically. We can easily customize the Redpanda cluster using the CRD object provided by the operator.

The post Running Redpanda on Kubernetes appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/09/06/running-redpanda-on-kubernetes/feed/ 2 13109
Manage Kubernetes Cluster with Terraform and Argo CD https://piotrminkowski.com/2022/06/28/manage-kubernetes-cluster-with-terraform-and-argo-cd/ https://piotrminkowski.com/2022/06/28/manage-kubernetes-cluster-with-terraform-and-argo-cd/#comments Tue, 28 Jun 2022 07:52:23 +0000 https://piotrminkowski.com/?p=11992 In this article, you will learn how to create a Kubernetes cluster with Terraform and then manage it with Argo CD. Terraform is very useful for automating infrastructure. On the other hand, Argo CD helps us implement GitOps and continuous delivery for our applications. It seems that we can successfully combine both these tools. Let’s […]

The post Manage Kubernetes Cluster with Terraform and Argo CD appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to create a Kubernetes cluster with Terraform and then manage it with Argo CD. Terraform is very useful for automating infrastructure. On the other hand, Argo CD helps us implement GitOps and continuous delivery for our applications. It seems that we can successfully combine both these tools. Let’s consider how they can help us to work with Kubernetes in the GitOps style.

For a basic introduction to using Argo CD on Kubernetes, you may refer to this article.

Introduction

First of all, I would like to define the whole cluster and store its configuration in Git. I can’t use only Argo CD to achieve it, because Argo CD must run on the existing Kubernetes cluster. That’s why I need a tool that is able to create a cluster and then install Argo CD there. In that case, Terraform seems to be a natural choice. On the other hand, I don’t want to use Terraform to manage apps running on Kubernetes. It is perfect for a one-time activity like creating a cluster, but not for continuous tasks like app delivery and configuration management.

Here’s the list of things we are going to do:

  1. In the first step, we will create a local Kubernetes cluster using Terraform
  2. Then we will install OLM (Operator Lifecycle Manager) on the cluster. We need it to install Kafka with Strimzi (Step 5)
  3. We will use Terraform to install Argo CD from the Helm chart and create a single Argo CD Application responsible for the whole cluster configuration based on Git
  4. After that, Argo CD Application installs Strimzi Operator, creates Argo CD Project dedicated to Kafka installation and Argo CD Application that runs Kafka on Kubernetes
  5. Finally, the Argo CD Application automatically creates all the CRD objects required for running Kafka

The most important thing here is that everything should happen after running the terraform apply command. Terraform installs Argo CD, and then Argo CD installs Kafka, which is our sample app in that scenario. Let’s see how it works.

terraform-kubernetes-arch

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

1. Create Kubernetes Cluster with Terraform

In order to easily create a Kubernetes cluster, we will use Kind. There is a dedicated Terraform provider for Kind available here. Of course, you can run Kubernetes on any cloud, and you will also find Terraform providers for that.

Our cluster consists of three worker nodes and a single master node. We need three nodes because finally, we will install a Kafka cluster running in three instances. Each of them will be deployed on a different node. Here’s our Terraform main.tf file for that step. We need to define the latest version of the tehcyx/kind provider (which is 0.0.12) in the required_providers section. The name of our cluster is cluster1. We will also enable the wait_for_ready option, to proceed to the next steps after the cluster is ready.

terraform {
  required_providers {
    kind = {
      source = "tehcyx/kind"
      version = "0.0.12"
    }
  }
}

provider "kind" {}

resource "kind_cluster" "default" {
  name = "cluster-1"
  wait_for_ready = true
  kind_config {
    kind = "Cluster"
    api_version = "kind.x-k8s.io/v1alpha4"

    node {
      role = "control-plane"
    }

    node {
      role = "worker"
      image = "kindest/node:v1.23.4"
    }

    node {
      role = "worker"
      image = "kindest/node:v1.23.4"
    }

    node {
      role = "worker"
      image = "kindest/node:v1.23.4"
    }
  }
}

Just to verify a configuration you can run the command terraform init, and then terraform plan. After that, you could apply the configuration using terraform apply, but as you probably remember we will do it after the last all the configuration is ready to apply everything in one command.

2. Install OLM on Kubernetes

As I mentioned before, Operator Lifecycle Manager (OLM) is a prerequisite for installing the Strimzi Kafka operator. You can find the latest release of OLM here. In fact, it comes down to applying two YAML manifests on Kubernetes. The first of them crds.yaml contains CRD definitions. The second of them olm.yaml provides all required Kubernetes objects to install OLM. Let’s just copy both these files into the local directory inside our Terraform repository. In order to apply them to Kubernetes, we first need to enable the Terraform kubectl provider.

terraform {
  ...

  required_providers {
    kubectl = {
      source  = "gavinbunney/kubectl"
      version = ">= 1.7.0"
    }
  }
}

Why do we use the kubectl provider instead of the official Terraform Kubernetes provider? The crds.yaml contains pretty large CRDs that go over size limits. We can easily solve that problem by enabling the server-side apply on the kubectl provider. The next case is that there are multiple Kubernetes objects defined inside both the YAML files. The kubectl provider supports it via the for_each parameter.

data "kubectl_file_documents" "crds" {
  content = file("olm/crds.yaml")
}

resource "kubectl_manifest" "crds_apply" {
  for_each  = data.kubectl_file_documents.crds.manifests
  yaml_body = each.value
  wait = true
  server_side_apply = true
}

data "kubectl_file_documents" "olm" {
  content = file("olm/olm.yaml")
}

resource "kubectl_manifest" "olm_apply" {
  depends_on = [data.kubectl_file_documents.crds]
  for_each  = data.kubectl_file_documents.olm.manifests
  yaml_body = each.value
}

Le’s consider the last case in this section. Before applying any YAML we are creating a new Kubernetes cluster in the previous step. Therefore, we cannot use the existing context. Fortunately, we can use the output arguments from the kubectl provider with the Kubernetes address and auth credentials.

provider "kubectl" {
  host = "${kind_cluster.default.endpoint}"
  cluster_ca_certificate = "${kind_cluster.default.cluster_ca_certificate}"
  client_certificate = "${kind_cluster.default.client_certificate}"
  client_key = "${kind_cluster.default.client_key}"
}

3. Install Argo CD with Helm

This is the last step on the Terraform side. We are going to install Argo CD using its Helm chart. We also need to create a single Argo CD Application responsible for the cluster management. This Application will install the Kafka Strimzi operator and create another Argo CD Application‘s used e.g. for running the Kafka cluster. In the first step, we need to do the same thing as before: define a provider and set the Kubernetes cluster address. Here’s our definition in Terraform:

provider "helm" {
  kubernetes {
    host = "${kind_cluster.default.endpoint}"
    cluster_ca_certificate = "${kind_cluster.default.cluster_ca_certificate}"
    client_certificate = "${kind_cluster.default.client_certificate}"
    client_key = "${kind_cluster.default.client_key}"
  }
}

The tricky thing here is that we need to create the Application just after Argo CD installation. By default, Terraform verifies if there are required CRD objects on Kubernetes. In that case, it requires the Application CRD from argoproj.io/v1alpha1. Fortunately, we can use the Helm chart parameter allowing us to pass the declaration of additional Applications. In order to do that, we have to set a custom values.yaml file. Here’s the Terraform declaration for the Argo CD installation:

resource "helm_release" "argocd" {
  name  = "argocd"

  repository       = "https://argoproj.github.io/argo-helm"
  chart            = "argo-cd"
  namespace        = "argocd"
  version          = "4.9.7"
  create_namespace = true

  values = [
    file("argocd/application.yaml")
  ]
}

In order to create an initial Application, we need to use the Helm chart server.additionalApplications parameter as shown. Here’s the whole argocd/application.yaml file. To simplify, the configuration used by Argo CD is located in the repository as Terraform configuration. You can find all the required YAMLs in the argocd/manifests directory.

server:
  additionalApplications:
   - name: cluster-config
     namespace: argocd
     project: default
     source:
       repoURL: https://github.com/piomin/sample-terraform-kubernetes-argocd.git
       targetRevision: HEAD
       path: argocd/manifests/cluster
       directory:
         recurse: true
     destination:
       server: https://kubernetes.default.svc
     syncPolicy:
       automated:
         prune: false
         selfHeal: false

4. Configure Kubernetes cluster with Argo CD

The last two steps are managed by Argo CD. We have successfully completed the Kubernetes cluster installation process. Now, it’s time to install our first application there. Our example app is Kafka. So, firstly we need to install the Kafka Strimzi operator. To do that, we just need to define a Subscription object managed by the previously installed OLM. The definition is available in the repository as the strimzi.yaml file.

apiVersion: operators.coreos.com/v1alpha1
kind: Subscription
metadata:
  name: my-strimzi-kafka-operator
  namespace: operators
spec:
  channel: stable
  name: strimzi-kafka-operator
  source: operatorhubio-catalog
  sourceNamespace: olm

We could configure a lot of aspects related to the whole cluster here. However, we just need to create a dedicated Argo CD Project and Application for Kafka configuration. Here’s our Project definition:

apiVersion: argoproj.io/v1alpha1
kind: AppProject
metadata:
  name: kafka
  namespace: argocd
spec:
  clusterResourceWhitelist:
    - group: '*'
      kind: '*'
  destinations:
    - name: '*'
      namespace: '*'
      server: '*'
  sourceRepos:
    - '*'

Let’s place the kafka ArgoCD Application inside the newly created Project.

apiVersion: argoproj.io/v1alpha1
kind: Application
metadata:
  name: kafka
  namespace: argocd
spec:
  destination:
    namespace: kafka
    server: https://kubernetes.default.svc
  project: kafka
  source:
    path: argocd/manifests/kafka
    repoURL: https://github.com/piomin/sample-terraform-kubernetes-argocd.git
    targetRevision: HEAD
  syncPolicy:
    syncOptions:
      - CreateNamespace=true

5. Create Kafka Cluster using GitOps

Finally, the last part of our exercise. We will create and run a 3-node Kafka cluster on Kind. Here’s the Kafka object definition we store in Git. We are setting 3 replicas for both Kafka and Zookeeper (used by the Kafka cluster). This manifest is available in the repository under the path argocd/manifests/kafka/cluster.yaml. We are exposing the cluster on 9092 (plain) and 9093 (TLS) ports. The Kafka cluster has storage mounted as the PVC into the Deployment.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 3
    version: 3.2.0
    logging:
      type: inline
      loggers:
        kafka.root.logger.level: "INFO"
    config:
      auto.create.topics.enable: "false"
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.2"
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 30Gi
          deleteClaim: true
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: true
  entityOperator:
    topicOperator: {}
    userOperator: {}

We will also define a single Kafka Topic inside the argocd/manifests/kafka/cluster.yaml manifest.

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: my-topic
  labels:
    strimzi.io/cluster: my-cluster
spec:
  partitions: 10
  replicas: 3

Execution on Kubernetes

Terraform

We have already prepared all the required scripts. Let’s proceed to the execution phase. If you still haven’t cloned the Git repository it’s time to do it:

$ git clone https://github.com/piomin/sample-terraform-kubernetes-argocd.git
$ cd sample-terraform-kubernetes-argocd

Firstly, let’s initialize our working directory containing Terraform configuration:

$ terraform init

Once we do it, we may preview a list of actions to perform:

$ terraform plan

You should receive a pretty large content as a response. Here’s the last part of my result:

If everything looks fine and there are no errors we may proceed to the next (final) step. Let’s begin the process:

$ terraform apply

All 24 objects should be successfully applied. Here’s the last part of the logs:

Now, you should have your cluster ready and running. Let’s display a list of Kind clusters:

$ kind get clusters
cluster-1

The name of our cluster is cluster-1. But the name of the Kubernetes context is kind-cluster-1:

Let’s display a list of applications deployed on the Kind cluster. You should have at least Argo CD and OLM installed. After some time Argo CD applies the configuration stored in the Git repository. Then, you should see the Kafka Strimzi operator installed in the operators namespace.

terraform-kubernetes-apps

Argo CD

After that, we can go to the Argo CD web console. To access it easily on the local port let’s enable port-forward:

$ kubectl port-forward service/argocd-server 8443:443 -n argocd

Now, you can display the Argo CD web console on the https://localhost:8443. The default username is admin. The password is auto-generated by the Argo CD. You can find it inside the Kubernetes Secret argocd-initial-admin-secret.

$ kubectl get secret argocd-initial-admin-secret -n argocd --template={{.data.password}} | base64 -D

Here’s the list of our Argo CD Applications. The cluster-config has an auto-sync option enabled. It installs the Strimzi operator and creates kafka Argo CD Application. I could also enable auto-sync for kafka Application. But just for the demo purpose, I left there a manual approval. So, let’s run Kafka on our cluster. To do that click the Sync button on the kafka tile.

terraform-kubernetes-argocd

Once you do the Kafka installation is starting. Finally, you should have the whole cluster ready and running. Each Kafka and Zookeeper node are running on the different Kubernetes worker node:

That’s all. We created everything using a single Terraform command and one click on the Argo CD web console. Of course, we could enable auto-sync for the kafka application, so we even don’t need to log in to the Argo CD web console for the final effect.

The post Manage Kubernetes Cluster with Terraform and Argo CD appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/06/28/manage-kubernetes-cluster-with-terraform-and-argo-cd/feed/ 23 11992
Introduction to ksqlDB on Kubernetes with Spring Boot https://piotrminkowski.com/2022/06/22/introduction-to-ksqldb-on-kubernetes-with-spring-boot/ https://piotrminkowski.com/2022/06/22/introduction-to-ksqldb-on-kubernetes-with-spring-boot/#comments Wed, 22 Jun 2022 14:01:47 +0000 https://piotrminkowski.com/?p=11924 In this article, you will learn how to run ksqlDB on Kubernetes and use it with Spring Boot. You will also see how to run Kafka on Kubernetes based on the Strimzi operator. In order to integrate Spring Boot with the ksqlDB server, we are going to leverage a lightweight Java client provided by ksqlDB. […]

The post Introduction to ksqlDB on Kubernetes with Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to run ksqlDB on Kubernetes and use it with Spring Boot. You will also see how to run Kafka on Kubernetes based on the Strimzi operator. In order to integrate Spring Boot with the ksqlDB server, we are going to leverage a lightweight Java client provided by ksqlDB. This client supports pull and push queries. It also provides an API for inserting rows and creating tables or streams. You can read more about it in the ksqlDB documentation here.

Our sample Spring Boot application is very simple. We will use Spring Cloud Stream Supplier bean for generating and sending events to the Kafka topic. For more information about Kafka with Spring Cloud Stream please refer to the following article. On the other hand, our application gets data from the Kafka topic using kSQL queries. It also creates KTable on startup.

Let’s take a look at our architecture.

ksqldb-kubernetes-arch

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then go to the transactions-service directory. After that, you should just follow my instructions. Let’s begin.

Prerequisites

We will use several tools. You need to have:

  1. Kubernetes cluster – it may be a single-node, local cluster like Minikube or Kind. Personally, I’m using Kubernetes on the Docker Desktop
  2. kubectl CLI – to interact with the cluster
  3. Helm – we will use it to install the ksqlDB server on Kubernetes. If you don’t have Helm, you will have to install it

Run Kafka on Kubernetes with Strimzi

Of course, we need an instance of Kafka to perform our exercise. There are several ways to run Kafka on Kubernetes. I’ll show you how to do it with the operator-based approach. In the first step, you need to install OLM (Operator Lifecycle Manager) on your cluster. In order to do that, you can just execute the following command on your Kubernetes context:

$ curl -L https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.21.2/install.sh -o install.sh
$ chmod +x install.sh
$ ./install.sh v0.21.2

Then, you can proceed to the Strimzi operator installation. That’s just a single command.

$ kubectl create -f https://operatorhub.io/install/stable/strimzi-kafka-operator.yaml

Now, we can create a Kafka cluster on Kubernetes. Let’s begin with a dedicated namespace for our exercise:

$ kubectl create ns kafka

I assume you have a single-node Kubernetes cluster, so we also create a single-node Kafka. Here’s the YAML manifest with Kafka CRD. You can find it in the repository under the path k8s/cluster.yaml.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  entityOperator:
    topicOperator: {}
    userOperator: {}
  kafka:
    config:
      default.replication.factor: 1
      inter.broker.protocol.version: "3.2"
      min.insync.replicas: 1
      offsets.topic.replication.factor: 1
      transaction.state.log.min.isr: 1
      transaction.state.log.replication.factor: 1
    listeners:
      - name: plain
        port: 9092
        tls: false
        type: internal
      - name: tls
        port: 9093
        tls: true
        type: internal
    replicas: 1
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 30Gi
          deleteClaim: true
    version: 3.2.0
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: true

Let’s apply it to Kubernetes in the kafka namespace:

$ kubectl apply -f k8s/cluster.yaml -n kafka

You should see a single instance of Kafka and also a single instance of Zookeeper. If the pods are running, it means you have Kafka on Kubernetes.

$ kubectl get pod -n kafka
NAME                                          READY   STATUS    RESTARTS  AGE
my-cluster-entity-operator-68cc6bc4d9-qs88p   3/3     Running   0         46m
my-cluster-kafka-0                            1/1     Running   0         48m
my-cluster-zookeeper-0                        1/1     Running   0         48m

Kafka is available inside the cluster under the name my-cluster-kafka-bootstrap and port 9092.

kubectl get svc -n kafka
NAME                          TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                               AGE
my-cluster-kafka-bootstrap    ClusterIP   10.108.109.255   <none>        9091/TCP,9092/TCP,9093/TCP            47m
my-cluster-kafka-brokers      ClusterIP   None             <none>        9090/TCP,9091/TCP,9092/TCP,9093/TCP   47m
my-cluster-zookeeper-client   ClusterIP   10.102.10.251    <none>        2181/TCP                              47m
my-cluster-zookeeper-nodes    ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP            47m

Run KsqlDB Server on Kubernetes

The KsqlDB Server is a part of the Confluent Platform. Since we are not installing the whole Confluent Platform on Kubernetes, but just an open-source Kafka cluster, we need to install KsqlDB Server separately. Let’s do it with Helm. There is no “official” Helm chart for the KSQL server. Therefore, we should go directly to the Confluent Helm repository on GitHub:

$ git clone https://github.com/confluentinc/cp-helm-charts.git
$ cd cp-helm-charts

In this repository, you can find separate Helm charts for every single Confluent component including e.g. control center or KSQL Server. The location of our chart inside the repository is charts/cp-ksql-server. We need to override some default settings during installation. First of all, we have to disable the headless mode. In the headless mode, KSQL Server does not expose the HTTP endpoint and loads queries from the input script. Our Spring Boot app will connect to the server through HTTP. In the next step, we should override the default address of the Kafka cluster and the default version of the KSQL Server which is still 6.1.0 there. We will use the latest version 7.1.1. Here’s the helm command you should run on your Kubernetes cluster:

$ helm install cp-ksql-server \
    --set ksql.headless=false \
    --set kafka.bootstrapServers=my-cluster-kafka-bootstrap:9092 \
    --set imageTag=7.1.1 \
  charts/cp-ksql-server -n kafka

Here’s the result:

Let’s verify if KSQL is running on the cluster:

$ kubectl get pod -n kafka | grep ksql
cp-ksql-server-679fc98889-hldfv               2/2     Running   0               2m11s

The HTTP endpoint is available for other applications under the name cp-ksql-server and port 8088:

$ kubectl get svc -n kafka | grep ksql
cp-ksql-server                ClusterIP   10.109.189.36    <none>        8088/TCP,5556/TCP                     3m25s

Now, we have the whole required staff running on our Kubernetes cluster. Therefore, we can proceed to the Spring Boot app implementation.

Integrate Spring Boot with ksqlDB

I didn’t find any out-of-the-box integration between Spring Boot and ksqlDB. Therefore, we will use the ksqldb-api-client directly. In the first, we need to include the ksqlDB Maven repository and some dependencies:

<dependencies>
        ...

  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-api-client</artifactId>
    <version>0.26.0</version>
  </dependency>
  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-udf</artifactId>
    <version>0.26.0</version>
  </dependency>
  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-common</artifactId>
    <version>0.26.0</version>
  </dependency>
</dependencies>

<repositories>
  <repository>
    <id>ksqlDB</id>
    <name>ksqlDB</name>
    <url>https://ksqldb-maven.s3.amazonaws.com/maven/</url>
  </repository>
</repositories>

After that, we can define a Spring @Bean returning the ksqlDB Client implementation. Since we will run our application in the same namespace as the KSQL Server, we need to provide the Kubernetes Service name as the host name.

@Configuration
public class KSQLClientProducer {

    @Bean
    Client ksqlClient() {
        ClientOptions options = ClientOptions.create()
                .setHost("cp-ksql-server")
                .setPort(8088);
        return Client.create(options);
    }
}

Our application is interacting with KSQL Server through an HTTP endpoint. It creates a single KTable on startup. To do that, we need to invoke the executeStatement method on the instance of the KSQL Client bean. We are creating the SOURCE table to enable running pull queries on it. The table gets data from the transactions topic. It expects JSON format in the incoming events.

public class KTableCreateListener implements ApplicationListener<ContextRefreshedEvent> {

   private static final Logger LOG = LoggerFactory.getLogger(KTableCreateListener.class);
   private Client ksqlClient;

   public KTableCreateListener(Client ksqlClient) {
      this.ksqlClient = ksqlClient;
   }

   @Override
   public void onApplicationEvent(ContextRefreshedEvent event) {
      try {
         String sql = """
                 CREATE SOURCE TABLE IF NOT EXISTS transactions_view (
                   id BIGINT PRIMARY KEY,
                   sourceAccountId BIGINT,
                   targetAccountId BIGINT,
                   amount INT
                 ) WITH (
                   kafka_topic='transactions',
                   value_format='JSON'
                 );
                 """;
         ExecuteStatementResult result = ksqlClient.executeStatement(sql).get();
         LOG.info("Result: {}", result.queryId().orElse(null));
      } catch (ExecutionException | InterruptedException e) {
         LOG.error("Error: ", e);
      }
   }
}

After creating the table we can run some queries on it. There are pretty simple queries. We are trying to find all transactions and all transactions related to the particular account.

@RestController
@RequestMapping("/transactions")
public class TransactionResource {

   private static final Logger LOG = LoggerFactory.getLogger(TransactionResource.class);
   Client ksqlClient;

   public TransactionResource(Client ksqlClient) {
      this.ksqlClient = ksqlClient;
   }

   @GetMapping
   public List<Transaction> getTransactions() throws ExecutionException, InterruptedException {
      StreamedQueryResult sqr = ksqlClient
            .streamQuery("SELECT * FROM transactions_view;")
            .get();
      Row row;
      List<Transaction> l = new ArrayList<>();
      while ((row = sqr.poll()) != null) {
         l.add(mapRowToTransaction(row));
      }
      return l;
   }

   @GetMapping("/target/{accountId}")
   public List<Transaction> getTransactionsByTargetAccountId(@PathVariable("accountId") Long accountId)
            throws ExecutionException, InterruptedException {
      StreamedQueryResult sqr = ksqlClient
            .streamQuery("SELECT * FROM transactions_view WHERE sourceAccountId=" + accountId + ";")
            .get();
      Row row;
      List<Transaction> l = new ArrayList<>();
      while ((row = sqr.poll()) != null) {
         l.add(mapRowToTransaction(row));
      }
      return l;
   }

   private Transaction mapRowToTransaction(Row row) {
      Transaction t = new Transaction();
      t.setId(row.getLong("ID"));
      t.setSourceAccountId(row.getLong("SOURCEACCOUNTID"));
      t.setTargetAccountId(row.getLong("TARGETACCOUNTID"));
      t.setAmount(row.getInteger("AMOUNT"));
      return t;
   }

}

Sending events to the topic with Spring Cloud Stream

Finally, we can proceed to the last part of our exercise. We need to generate test data and send it to the Kafka transactions topic. The simplest way to achieve it is with the Spring Cloud Stream Kafka module. Firstly, let’s add the following Maven dependency:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

Then, we may create a producer based on the Spring Supplier bean. The Supplier bean continuously generates and sends new events to the target channel. By default, it repeats the action once per second.

@Configuration
public class KafkaEventProducer {

   private static long transactionId = 0;
   private static final Random r = new Random();

   @Bean
   public Supplier<Message<Transaction>> transactionsSupplier() {
      return () -> {
          Transaction t = new Transaction();
          t.setId(++transactionId);
          t.setSourceAccountId(r.nextLong(1, 100));
          t.setTargetAccountId(r.nextLong(1, 100));
          t.setAmount(r.nextInt(1, 10000));
          Message<Transaction> o = MessageBuilder
                .withPayload(t)
                .setHeader(KafkaHeaders.MESSAGE_KEY, new TransactionKey(t.getId()))
                .build();
          return o;
      };
   }
}

Of course, we also need to provide the address of our Kafka cluster and the name of a target topic for the channel. The address of Kafka is injected at the deployment phase.

spring.kafka.bootstrap-servers = ${KAFKA_URL}
spring.cloud.stream.bindings.transactionsSupplier-out-0.destination = transactions

Finally, let’s deploy our Spring Boot on Kubernetes. Here’s the YAML manifest containing Kubernetes Deployment and Service definitions:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: transactions
spec:
  selector:
    matchLabels:
      app: transactions
  template:
    metadata:
      labels:
        app: transactions
    spec:
      containers:
      - name: transactions
        image: piomin/transactions-service
        env:
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap:9092
        ports:
        - containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
  name: transactions
spec:
  type: ClusterIP
  selector:
    app: transactions
  ports:
    - port: 8080

Let’s deploy the app in the kafka namespace:

$ kubectl apply -f k8s/deployment.yaml -n kafka

Testing ksqlDB on Kubernetes

Once the app is deployed on Kubernetes, let’s enable port-forward to test it on the local port:

$ kubectl port-forward service/transactions 8080:8080

Now, we can test our two HTTP endpoints. Let’s start with the endpoint for searching all transactions:

$ curl http://localhost:8080/transactions

Then, you can call the endpoint for searching all transactions related to the targetAccountId, e.g.:

$ curl http://localhost:8080/transactions/target/10

Final Thoughts

In this article, I wanted to show how you can start with ksqlDB on Kubernetes. We used such frameworks as Spring Boot and Spring Cloud Stream to interact with Kafka and ksqlDB. You could see how to run the Kafka cluster on Kubernetes using the Strimzi operator or how to deploy KSQL Server directly from the Helm repository.

The post Introduction to ksqlDB on Kubernetes with Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/06/22/introduction-to-ksqldb-on-kubernetes-with-spring-boot/feed/ 12 11924
Deep Dive into Saga Transactions with Kafka Streams and Spring Boot https://piotrminkowski.com/2022/02/07/deep-dive-into-saga-transactions-with-kafka-streams-and-spring-boot/ https://piotrminkowski.com/2022/02/07/deep-dive-into-saga-transactions-with-kafka-streams-and-spring-boot/#comments Mon, 07 Feb 2022 14:38:20 +0000 https://piotrminkowski.com/?p=10587 In this article, you will learn how to use Kafka Streams and Spring Boot to perform transactions according to the Saga pattern. To be honest, I was quite surprised by a great deal of attention to my last article about Kafka. I got some questions about streams, transactions, and support for Kafka in Spring Boot. […]

The post Deep Dive into Saga Transactions with Kafka Streams and Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams and Spring Boot to perform transactions according to the Saga pattern. To be honest, I was quite surprised by a great deal of attention to my last article about Kafka. I got some questions about streams, transactions, and support for Kafka in Spring Boot. In this article, I’ll try to answer a few of them. I will also show how you can easily set up a cloud-managed Kafka on the Upstash.

Introduction

First of all, let’s recap the approach described in the previous article. We used Kafka Streams to process order transactions on the order-service side. To handle orders coming to the stock-service and payment-service we used a standard Spring @KafkaListener. There are also two databases – a single database per every service. The stock-service stores data related to the number of available products and updates them after receiving an order. The same with the payment-service. It updates the customer’s account on every single order. Both applications receive orders from Kafka topic. They send responses to other topics. But just to simplify, we will skip it as shown in the figure below. We treat the Kafka orders topic as a stream of events and also as a table with the latest order’s status.

kafka-streams-transactions-old-arch

What may go wrong with that approach? In fact, we have two data sources here. We use Kafka as the order store. On the other hand, there are SQL databases (in my case H2, but you can use any other) that store stock and payment data. Once we send an order with a reservation to the Kafka topic, we need to update a database. Since Kafka does not support XA transactions, it may result in data inconsistency. Of course, Kafka doesn’t support XA transactions the same as many other systems including e.g. RabbitMQ.

The question is what can we do with that? One of the possible options you may use is an approach called Change Data Capture (CDC) with the outbox pattern. CDC identifies and tracks changes to data in a database. Then it may emit those changes as events and send them, for example to the Kafka topic. I won’t go into the details of that process. If you are interested in you may read this article written by Gunnar Morling.

Architecture with Kafka Streams

The approach I will describe today is fully based on the Kafka Streams. We won’t use any SQL databases. When the order-service sends a new order its id is the message key. With Kafka Streams, we may change a message key in the stream. It results in creating new topics and repartitioning. With new message keys, we may perform calculations just for the specific customerId or productId. The result of such calculation may be saved in the persistent store. For example, Kafka automatically creates and manages such state stores when you are calling stateful operations like count() or aggregate(). We will aggregate the orders related to the particular customer or product. Here’s the illustration of our architecture. Here’s the visualization of our process.

kafka-streams-transactions-arch

Now, let’s consider a scenario for the payment-service in details. In the incoming stream of orders the payment-service calls the selectKey() operation. It changes the key from the order’s id into the order’s customerId. Then it groups all the orders by the new key and invokes the aggregate() operation. In the aggregate() method it calculates the available amount and reserved amount based on the order’s price and status (whether it is a new order or a confirmation order). If there are sufficient funds on the customer account it sends the ACCEPT order to the payment-orders topic. Otherwise, it sends the REJECT order. Then the order-service process responses by joining streams from payment-orders and stock-orders by the order’s id. As the result, it sends a confirmation or a rollback order.

kafka-streams-transactions-details

Finally, let’s proceed to the implementation!

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then switch to the streams-full branch. After that, you should just follow my instructions.

Aggregation with Kafka Streams

Let’s begin with the payment-service. The implementation of KStream in not complicated here. In the first step (1), we invoke the selectKey() method and get the customerId value of the Order object as a new key. Then we call groupByKey() method (2) to receive KGroupedStream as a result. While we have KGroupedStream we may invoke one of the calculation methods. In that case, we need to use aggregate(), since we have a little bit more advanced calculation than just a simple count (3). The last two steps are just for printing the value after calculation.

@Bean
public KStream<Long, Order> stream(StreamsBuilder builder) {
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   JsonSerde<Reservation> rsvSerde = new JsonSerde<>(Reservation.class);
   KStream<Long, Order> stream = builder
      .stream("orders", Consumed.with(Serdes.Long(), orderSerde))
      .peek((k, order) -> LOG.info("New: {}", order));

   KeyValueBytesStoreSupplier customerOrderStoreSupplier =
      Stores.persistentKeyValueStore("customer-orders");

   stream.selectKey((k, v) -> v.getCustomerId()) // (1)
      .groupByKey(Grouped.with(Serdes.Long(), orderSerde)) // (2)
      .aggregate(
         () -> new Reservation(random.nextInt(1000)),
         aggregatorService,
         Materialized.<Long, Reservation>as(customerOrderStoreSupplier)
            .withKeySerde(Serdes.Long())
            .withValueSerde(rsvSerde)) // (3)
      .toStream()
      .peek((k, trx) -> LOG.info("Commit: {}", trx));

   return stream;
}

However, the most important step in the fragment of code visible above is the class called inside the aggregate() method. The aggregate() method takes three input arguments. The first of them indicates the starting value of our compute object. That object represents the current state of the customer’s account. It has two fields: amountAvailable and amountReserved. To clarify, we use that object instead of the entity that stores available and reserved amounts on the customer account. Each customer is represented by the customerId (key) and the Reservation object (value) in Kafka KTable. Just for the test purpose, we are generating the starting value of amountAvailable as a random number between 0 and 1000.

public class Reservation {
   private int amountAvailable;
   private int amountReserved;

   public Reservation() {
   
   }

   public Reservation(int amountAvailable) {
      this.amountAvailable = amountAvailable;
   }

   // GETTERS AND SETTERS ...

}

Ok, let’s take a look at our aggregation method. It needs to implement the Kafka Aggregate interface and its method apply(). It may handle three types of orders. One of them is a confirmation of the order (1). It confirms the distributed transaction, so we just need to cancel a reservation by subtracting the order’s price from the amountReserved field. On the other, in the case of rollback, we need to increase the value of amountAvailable by the order’s price and decrease the value amountRerserved accordingly (2). Finally, if we receive a new order we need to perform a reservation if there are sufficient funds on the customer account, or otherwise, reject an order.

Aggregator<Long, Order, Reservation> aggregatorService = (id, order, rsv) -> {
   switch (order.getStatus()) {
      case "CONFIRMED" -> // (1)
         rsv.setAmountReserved(rsv.getAmountReserved() 
               - order.getPrice());
      case "ROLLBACK" -> { // (2)
         if (!order.getSource().equals("PAYMENT")) {
            rsv.setAmountAvailable(rsv.getAmountAvailable() 
                  + order.getPrice());
            rsv.setAmountReserved(rsv.getAmountReserved() 
                  - order.getPrice());
         }
      }
      case "NEW" -> { // (3)
         if (order.getPrice() <= rsv.getAmountAvailable()) {
            rsv.setAmountAvailable(rsv.getAmountAvailable() 
                  - order.getPrice());
            rsv.setAmountReserved(rsv.getAmountReserved() 
                  + order.getPrice());
            order.setStatus("ACCEPT");
         } else {
            order.setStatus("REJECT");
         }
         template.send("payment-orders", order.getId(), order);
      }
   }
   LOG.info("{}", rsv);
   return rsv;
};

State Store with the Kafka Streams Table

The implementation of the stock-service is pretty similar to the payment-service. With the difference that we count a number of available products on stock instead of available funds on the customer account. Here’s our Reservation object:

public class Reservation {
   private int itemsAvailable;
   private int itemsReserved;

   public Reservation() {
    
   }

   public Reservation(int itemsAvailable) {
      this.itemsAvailable = itemsAvailable;
   }

   // GETTERS AND SETTERS ...

}

The implementation of the aggregation method is also very similar to the payment-service. However, this time, let’s focus on another thing. Once we process a new order we need to send a response to the stock-orders topic. We use KafkaTemplate for that. In the case of payment-service we also send a response, but to the payment-orders topic. The send method from the KafkaTemplate does not block the thread. It returns the ListenableFuture objects. We may add a callback to the send method using it and the result after sending the message (1). Finally, let’s log the current state of the Reservation object (2).

Aggregator<Long, Order, Reservation> aggrSrv = (id, order, rsv) -> {
   switch (order.getStatus()) {
      case "CONFIRMED" -> rsv.setItemsReserved(rsv.getItemsReserved() 
            - order.getProductCount());
      case "ROLLBACK" -> {
         if (!order.getSource().equals("STOCK")) {
            rsv.setItemsAvailable(rsv.getItemsAvailable() 
                  + order.getProductCount());
            rsv.setItemsReserved(rsv.getItemsReserved() 
                  - order.getProductCount());
         }
      }
      case "NEW" -> {
         if (order.getProductCount() <= rsv.getItemsAvailable()) {
            rsv.setItemsAvailable(rsv.getItemsAvailable() 
                  - order.getProductCount());
            rsv.setItemsReserved(rsv.getItemsReserved() 
                  + order.getProductCount());
            order.setStatus("ACCEPT");
         } else {
            order.setStatus("REJECT");
         }
         // (1)
         template.send("stock-orders", order.getId(), order)
            .addCallback(r -> LOG.info("Sent: {}", 
               result != null ? result.getProducerRecord().value() : null),
               ex -> {});
      }
   }
   LOG.info("{}", rsv); // (2)
   return rsv;
};

After that, we are also logging the value of the Reservation object (1). In order to do that we need to convert KTable into KStream and then call the peek method. This log is printed just after Kafka Streams commits the offset in the source topic.

@Bean
public KStream<Long, Order> stream(StreamsBuilder builder) {
   JsonSerde<Order> orderSerde = new JsonSerde<>(Order.class);
   JsonSerde<Reservation> rsvSerde = new JsonSerde<>(Reservation.class);
   KStream<Long, Order> stream = builder
      .stream("orders", Consumed.with(Serdes.Long(), orderSerde))
      .peek((k, order) -> LOG.info("New: {}", order));

   KeyValueBytesStoreSupplier stockOrderStoreSupplier =
      Stores.persistentKeyValueStore("stock-orders");

   stream.selectKey((k, v) -> v.getProductId())
      .groupByKey(Grouped.with(Serdes.Long(), orderSerde))
      .aggregate(() -> new Reservation(random.nextInt(100)), aggrSrv,
         Materialized.<Long, Reservation>as(stockOrderStoreSupplier)
            .withKeySerde(Serdes.Long())
            .withValueSerde(rsvSerde))
      .toStream()
      .peek((k, trx) -> LOG.info("Commit: {}", trx)); // (1)

   return stream;
}

What will happen if you send the test order? Let’s see the logs. You can see the difference in time between processing the message and offset commit. You won’t have any problems with that until your application is running or it has been stopped gracefully. But if you, for example, kill the process using the kill -9 command? After restart, our application will receive the same messages once again. Since we use KafkaTemplate to send the response to the stock-orders topic, we need to commit the offset as soon as possible.

What can we do to avoid such problems? We may override the default value (30000) of the commit.interval.ms Kafka Streams property. If you set it to 0, it commits immediately after processing finishes.

spring.kafka:  
  streams:
    properties:
      commit.interval.ms: 0

On the other hand, we can also set the property processing.guarantee to exactly_once. It also changes the default value of commit.interval.ms to 100ms and enables idempotence for a producer. You can read more about it here in Kafka documentation.

spring.kafka:  
  streams:
    properties:
      processing.guarantee: exactly_once

Running Kafka on Upstash

For the purpose of today’s exercise, we will use a serverless Kafka cluster on Upstash. You can create it with a single click. If you would like to test JAAS authentication for your application I’ve got good news 🙂 The authentication on that cluster is enabled by default. You can find and copy username and password from the cluster’s main panel.

kafka-streams-transactions-upstash

Now, let’s configure Kafka connection settings and credentials for the Spring Boot application. There is a developer free tier on Upstash up to 10k messages per day. It will be enough for our tests.

spring.kafka:
  bootstrap-servers: topical-gar-11460-us1-kafka.upstash.io:9092
  properties:
    security.protocol: SASL_SSL
    sasl.mechanism: SCRAM-SHA-256
    sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${USERNAME}" password="${PASSWORD}";

With Upstash you can easily display a list of topics. In total, there are 10 topics used in our sample system. Three of them are used directly by the Spring Boot applications, while the rest of them by the Kafka Streams in order to process stateful operations.

After starting the order-service application we can call its REST endpoint to create and send an order to the Kafka topic.

private static final Logger LOG = 
   LoggerFactory.getLogger(OrderController.class);
private AtomicLong id = new AtomicLong();
private KafkaTemplate<Long, Order> template;

@PostMapping
public Order create(@RequestBody Order order) {
   order.setId(id.incrementAndGet());
   template.send("orders", order.getId(), order);
   LOG.info("Sent: {}", order);
   return order;
}

Let’s call the endpoint using the following curl command. You can use any customerId or productId you want.

$ curl -X 'POST' \
  'http://localhost:8080/orders' \
  -H 'Content-Type: application/json' \
  -d '{
    "customerId": 20,
    "productId": 20,
    "productCount": 2,
    "price": 10,
    "status": "NEW"
  }'

All three sample applications use Kafka Streams to process distributed transactions. Once the order is accepted by both stock-service and payment-service you should see the following entry in the order-service logs.

You can easily simulate rejection of transactions with Kafka Streams just by setting e.g. productCount higher than the value generated by the product-service as available items.

With Upstash UI you can also easily verify the number of messages incoming to the topics. Let’s see the current statistics for the orders topic.

Final Thoughts

In order to fully understand what happens in this example, you should be also familiar with the Kafka Streams threading model. It is worth reading the following article, which explains it in a clean manner. First of all, each stream partition is a totally ordered sequence of data records and maps to a Kafka topic partition. It means, that even if we have multiple orders at the same time related to e.g. same product, they are all processed sequentially since they have the same message key (productId in that case).

Moreover, by default, there is only a single stream thread that handles all the partitions. You can see this in the logs below. However, there are stream tasks that act as the lowest-level units of parallelism. As a result, stream tasks can be processed independently and in parallel without manual intervention.

I hope this article helps you to better understand Kafka Streams. I just wanted to give you a simple example of how you can use Kafka Streams with Saga transactions in order to simplify your current architecture.

The post Deep Dive into Saga Transactions with Kafka Streams and Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/02/07/deep-dive-into-saga-transactions-with-kafka-streams-and-spring-boot/feed/ 8 10587