spring cloud stream Archives - Piotr's TechBlog https://piotrminkowski.com/tag/spring-cloud-stream/ Java, Spring, Kotlin, microservices, Kubernetes, containers Fri, 19 Jan 2024 09:25:53 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 spring cloud stream Archives - Piotr's TechBlog https://piotrminkowski.com/tag/spring-cloud-stream/ 32 32 181738725 Serverless on Azure with Spring Cloud Function https://piotrminkowski.com/2024/01/19/serverless-on-azure-with-spring-cloud-function/ https://piotrminkowski.com/2024/01/19/serverless-on-azure-with-spring-cloud-function/#respond Fri, 19 Jan 2024 09:25:49 +0000 https://piotrminkowski.com/?p=14829 This article will teach you how to create and run serverless apps on Azure using the Spring Cloud Function and Spring Cloud Azure projects. We will integrate with the Azure Functions and Azure Event Hubs services. It is not my first article about Azure and Spring Cloud. As a preparation for that exercise, it is […]

The post Serverless on Azure with Spring Cloud Function appeared first on Piotr's TechBlog.

]]>
This article will teach you how to create and run serverless apps on Azure using the Spring Cloud Function and Spring Cloud Azure projects. We will integrate with the Azure Functions and Azure Event Hubs services.

It is not my first article about Azure and Spring Cloud. As a preparation for that exercise, it is worth reading the article to familiarize yourself with some interesting features of Spring Cloud Azure. It describes an integration with Azure Spring Apps, Cosmos DB, and App Configuration services. On the other hand, if you are interested in CI/CD for Spring Boot apps you can refer to the following article about Azure DevOps and Terraform.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. The Spring Boot apps used in the article are located in the serverless directory. After you go to that directory you should just follow my further instructions.

Architecture

In this exercise, we will prepare two sample Spring Boot apps (aka functions) account-function and customer-function. Then, we will deploy them on the Azure Function service. Our apps do not communicate with each other directly but through the Azure Event Hubs service. Event Hubs is a cloud-native data streaming service compatible with the Apache Kafka API. After adding a new customer the customer-function sends an event to the Azure Event Hubs using the Spring Cloud Stream binder. The account-function app receives the event through the Azure Event Hubs trigger. Also, the customer-function is exposed to the external client through the Azure HTTP trigger. Here’s the diagram of our architecture:

azure-serverless-spring-cloud-arch

Prerequisites

There are some prerequisites before you start the exercise. You need to install JDK17+ and Maven on your local machine. You also need to have an account on Azure and az CLI to interact with that account. Once you install the az CLI and log in to Azure you can execute the following command for verification:

$ az account show

If you would like to test Azure Functions locally, you need to install Azure Functions Core Tools. You can find detailed installation instructions in Microsoft Docs here. For macOS, there are three required commands to run:

$ brew tap azure/functions
$ brew install azure-functions-core-tools@4
$ brew link --overwrite azure-functions-core-tools@4

Create Resources on Azure

Before we proceed with the source code, we need to create several required resources on the Azure cloud. In the first step, we will prepare a resource group for all required objects. The name of the group is spring-cloud-serverless. The location depends on your preferences. For me it is eastus.

$ az group create -l eastus -n spring-cloud-serverless

In the next step, we need to create a storage account. The Azure Function service requires it, but we will also use that account during the local development with Azure Functions Core Tools.

$ az storage account create -n pminkowsserverless \
     -g spring-cloud-serverless \
     -l eastus \
     --sku Standard_LRS

In order to run serverless apps on Azure with e.g. Spring Cloud Function, we need to create the Azure Function App instances. Of course, we use the previously created resource group and storage account. The name of my Function App instances are pminkows-account-function and pminkows-customer-function. We can also set a default OS type (Linux), functions version (4), and a runtime stack (Java) for each Function App.

$ az functionapp create -n pminkows-customer-function \
     -c eastus \
     --os-type Linux \
     --functions-version 4 \
     -g spring-cloud-serverless \
     --runtime java \
     --runtime-version 17.0 \
     -s pminkowsserverless

$ az functionapp create -n pminkows-account-function \
     -c eastus \
     --os-type Linux \
     --functions-version 4 \
     -g spring-cloud-serverless \
     --runtime java \
     --runtime-version 17.0 \
     -s pminkowsserverless

Then, we have to create the Azure Event Hubs namespace. The name of my namespace is spring-cloud-serverless. The same as before I choose the East US location and the spring-cloud-serverless resource group. We can also set the pricing tier (Standard) and the upper limit of throughput units when the AutoInflate option is enabled.

$ az eventhubs namespace create -n spring-cloud-serverless \
     -g spring-cloud-serverless \
     --location eastus \
     --sku Standard \
     --maximum-throughput-units 1 \
     --enable-auto-inflate true

Finally, we have to create topics on Event Hubs. Of course, they have to be assigned to the previously created spring-cloud-serverless Event Hubs namespace. The names of our topics are accounts and customers. The number of partitions is irrelevant in this exercise.

$ az eventhubs eventhub create -n accounts \
     -g spring-cloud-serverless \
     --namespace-name spring-cloud-serverless \
     --cleanup-policy Delete \
     --partition-count 3

$ az eventhubs eventhub create -n customers \
     -g spring-cloud-serverless \
     --namespace-name spring-cloud-serverless \
     --cleanup-policy Delete \
     --partition-count 3

Now, let’s switch to the Azure Portal. Find the spring-cloud-serverless resource group. You should have the same list of resources inside this group as shown below. It means that our environment is ready and we can proceed to the source code.

App Dependencies

Firstly, we need to declare the dependencyManagement section inside the Maven pom.xml for three projects used in the app implementation: Spring Boot, Spring Cloud, and Spring Cloud Azure.

<properties>
  <java.version>17</java.version>
  <spring-boot.version>3.1.4</spring-boot.version>
  <spring-cloud-azure.version>5.7.0</spring-cloud-azure.version>
  <spring-cloud.version>2022.0.4</spring-cloud.version>
  <maven.compiler.release>${java.version}</maven.compiler.release>
  <maven.compiler.source>${java.version}</maven.compiler.source>
  <maven.compiler.target>${java.version}</maven.compiler.target>
</properties>

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-dependencies</artifactId>
      <version>${spring-cloud.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-dependencies</artifactId>
      <version>${spring-cloud-azure.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-dependencies</artifactId>
      <version>${spring-boot.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<build>
  <plugins>
    <plugin>
      <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-maven-plugin</artifactId>
      <executions>
       <execution>
          <goals>
           <goal>repackage</goal>
          </goals>
         </execution>
       </executions>
    </plugin>
  </plugins>
</build>

Here’s the list of required Maven dependencies. We need to include the spring-cloud-function-context library to enable Spring Cloud Functions. In order to integrate with the Azure Functions service, we need to include the spring-cloud-function-adapter-azure extension. Our apps also send messages to the Azure Event Hubs service through the dedicated Spring Cloud Stream binder.

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-function-adapter-azure</artifactId>
  </dependency>
  <dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-function-context</artifactId>
  </dependency>
</dependencies>

Create Azure Serverless Apps with Spring Cloud

Expose Azure Function as HTTP endpoint

Let’s begin with the customer-function. To simplify the app we will use an in-memory H2 for storing data. Each time a new customer is added, it is persisted in the in-memory database using Spring Data JPA extension. Here’s our entity class:

@Entity
public class Customer {
   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long id;
   private String name;
   private int age;
   private String status;

   // GETTERS AND SETTERS
}

Here’s the Spring Data repository interface for the Customer entity:

public interface CustomerRepository extends ListCrudRepository<Customer, Long> {
}

Let’s proceed to the Spring Cloud Functions implementation. The CustomerInternalFunctions bean defines two functions. The addConsumer function (1) persists a new customer in the database and sends the event about it to the Azure Event Hubs topic. It uses the Spring Cloud Stream StreamBridge bean for interacting with Event Hubs. It also returns the persisted Customer entity as a response. The second function changeStatus (2) doesn’t return any data, it just needs to react to the incoming event and change the status of the particular customer. That event is sent by the account-function app after generating an account for a newly created customer. The important information here is that those are just Spring Cloud functions. For now, they have nothing to do with Azure Functions.

@Service
public class CustomerInternalFunctions {

   private static final Logger LOG = LoggerFactory
      .getLogger(CustomerInternalFunctions.class);

   private StreamBridge streamBridge;
   private CustomerRepository repository;

   public CustomerInternalFunctions(StreamBridge streamBridge, 
                                    CustomerRepository repository) {
      this.streamBridge = streamBridge;
      this.repository = repository;
   }

   // (1)
   @Bean
   public Function<Customer, Customer> addCustomer() {
      return c -> {
         Customer newCustomer = repository.save(c);
         streamBridge.send("customers-out-0", newCustomer);
         LOG.info("New customer added: {}", c);
         return newCustomer;
      };
   }

   // (2)
   @Bean
   public Consumer<Account> changeStatus() {
      return account -> {
         Customer customer = repository.findById(account.getCustomerId())
                .orElseThrow();
         customer.setStatus(Customer.CUSTOMER_STATUS_ACC_ACTIVE);
         repository.save(customer);
         LOG.info("Customer activated: id={}", customer.getId());
      };
   }

}

We also need to provide several configuration properties in the Spring Boot application.properties file. We should set the Azure Event Hubs connection URL and the name of the target topic. Since our app uses Spring Cloud Stream only for sending events we should also turn off the autodiscovery of functional beans as messaging bindings.

spring.cloud.azure.eventhubs.connection-string = ${EVENT_HUBS_CONNECTION_STRING}
spring.cloud.stream.bindings.customers-out-0.destination = customers
spring.cloud.stream.function.autodetect = false

In order to expose the functions on Azure, we will take advantage of the Spring Cloud Function Azure Adapter. It includes several Azure libraries to our Maven dependencies. It can invoke the Spring Cloud Functions directly or through the lookup approach with the FunctionCatalog bean (1). The method has to be annotated with @FunctionName, which defines the name of the function in Azure (2). In order to expose that function over HTTP, we need to define the Azure @HttpTrigger (3). The trigger exposes the function as the POST endpoint and doesn’t require any authorization. Our method receives the request through the HttpRequestMessage object. Then, it invokes the Spring Cloud function addCustomer by name using the FunctionCatalog bean (4).

// (1)
@Autowired
private FunctionCatalog functionCatalog;

@FunctionName("add-customer") // (2)
public Customer addCustomerFunc(
        // (3)
        @HttpTrigger(name = "req",
                     methods = { HttpMethod.POST },
                     authLevel = AuthorizationLevel.ANONYMOUS)
        HttpRequestMessage<Optional<Customer>> request,
        ExecutionContext context) {
   Customer c = request.getBody().orElseThrow();
   context.getLogger().info("Request: {}" + c);
   // (4)
   Function<Customer, Customer> function = functionCatalog
      .lookup("addCustomer");
   return function.apply(c);
}

Integrate Function with Azure Data Hubs Trigger

Let’s switch to the account-function app directory inside our Git repository. The same as customer-function it uses an in-memory H2 database for storing customer accounts. Here’s our Account entity class:

@Entity
public class Account {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String number;
    private Long customerId;
    private int balance;

   // GETTERS AND SETTERS ...
}

Inside the account-function app, there is a single Spring Cloud Function addAccount. It generates a new 16-digit account number for each new customer. Then, it saves the account in the database and sends the event to the Azure Event Hubs with Spring Cloud Stream StreamBridge bean.

@Service
public class AccountInternalFunctions {

   private static final Logger LOG = LoggerFactory
      .getLogger(AccountInternalFunctions.class);

   private StreamBridge streamBridge;
   private AccountRepository repository;

   public AccountInternalFunctions(StreamBridge streamBridge, 
                                   AccountRepository repository) {
      this.streamBridge = streamBridge;
      this.repository = repository;
   }

   @Bean
   public Function<Customer, Account> addAccount() {
      return customer -> {
         String n = RandomStringUtils.random(16, false, true);
         Account a = new Account(n, customer.getId(), 0);
         a = repository.save(a);
         boolean b = streamBridge.send("accounts-out-0", a);
         LOG.info("New account added: {}", a);
         return a;
      };
   }

}

The same as for customer-function we need to provide some configuration settings for Spring Cloud Stream. However, instead of the customers topic, this time we are sending events to the accounts topic.

spring.cloud.azure.eventhubs.connection-string = ${EVENT_HUBS_CONNECTION_STRING}
spring.cloud.stream.bindings.accounts-out-0.destination = accounts
spring.cloud.stream.function.autodetect = false

The account-function app is not exposed as the HTTP endpoint. We want to trigger the function in reaction to the event delivered to the Azure Event Hubs topic. The name of our Azure function is new-customer (1). It receives the Customer event from the customers topic thanks to the @EventHubTrigger annotation (2). This annotation also defines a property name, that contains the address of the Azure Event Hubs namespace (EVENT_HUBS_CONNECTION_STRING). I’ll show you later how to set such a property for our function in Azure. Once, the new-customer function is triggered, it invokes the Spring Cloud Function addAccount (3).

@Autowired
private FunctionCatalog functionCatalog;

// (1)
@FunctionName("new-customer")
public void newAccountEventFunc(
        // (2)
        @EventHubTrigger(eventHubName = "customers",
                         name = "newAccountTrigger",
                         connection = "EVENT_HUBS_CONNECTION_STRING",
                         cardinality = Cardinality.ONE)
        Customer event,
        ExecutionContext context) {
   context.getLogger().info("Event: " + event);
   // (3)
   Function<Customer, Account> function = functionCatalog
      .lookup("addAccount");
   function.apply(event);
}

At the end of this section, let’s switch back once again to the customer-function app. It fires on the event sent by the new-customer function to the accounts topic. Therefore we are using the @EventHubTrigger annotation once again.

@FunctionName("activate-customer")
public void activateCustomerEventFunc(
          @EventHubTrigger(eventHubName = "accounts",
                 name = "changeStatusTrigger",
                 connection = "EVENT_HUBS_CONNECTION_STRING",
                 cardinality = Cardinality.ONE)
          Account event,
          ExecutionContext context) {
   context.getLogger().info("Event: " + event);
   Consumer<Account> consumer = functionCatalog.lookup("changeStatus");
   consumer.accept(event);
}

All our functions are ready. Now, we can proceed to the deployment phase.

Running Azure Functions Locally with Maven

Before we deploy our functions on Azure, we can run and test them locally. I assume you have already the Azure Functions Core Tools according to the “Prerequisites” section. Our function still needs to connect with some services on the cloud like Azure Event Hubs or the storage account. The address to the Azure Event Hubs should be set as the EVENT_HUBS_CONNECTION_STRING app property or environment variable. In order to find the Event Hubs connection string, we should switch to the Azure Portal and find the spring-cloud-serverless namespace. Then, we need to go to the “Shared access policies” menu item and click the “RootManagedSharedAccessKey” policy. The connection string value is available in the “Connection string-primary key” field.

We also need to obtain the connection credentials to the Azure storage account. In your storage account, you should find the “Access keys” section and copy the value from the “Connection string” field.

After that, we can create the local.settings.json file in each app’s root directory. Alternatively, we can just set the environment variables AzureWebJobsStorage and EVENT_HUBS_CONNECTION_STRING.

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": <YOUR_ACCOUNT_STORAGE_CONNECTION_STRING>,
    "FUNCTIONS_WORKER_RUNTIME": "java",
    "EVENT_HUBS_CONNECTION_STRING": <YOUR_EVENT_HUBS_CONNECTION_STRING>
  }
}

Once you place your credentials in the local.settings.json file, you can build the app with Maven.

$ mvn clean package

After that, you can use the Maven plugin included in the spring-cloud-function-adapter-azure module. In order to run the function, you need to execute the following command:

$ mvn azure-functions:run 

Here’s the command output for the customer-function app. As you see, it contains two Azure functions: add-customer (HTTP) and activate-customer (Event Hub Trigger). You can test the function by invoking the http://localhost:7071/api/add-customer URL.

Here’s the command output for the account-function app. As you see, it contains a single function new-customer activated through the Event Hub trigger.

Deploy Spring Cloud Serverless on Azure Functions

Let’s take a look at the pminkows-customer-function Azure Function App before we deploy our first app there. The http://pminkows-customer-function.azurewebsites.net base URL will precede all the HTTP endpoint URLs for our functions. We should remember the name of the automatically generated service plan (EastUSLinuxDynamicPlan).

Before deploying our Spring Boot app we should create the host.json file with the following content:

{
  "version": "2.0",
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  }
}

Then, we should add the azure-functions-maven-plugin Maven plugin to our pom.xml. In the configuration section, we have to set the name of the Azure Function App instance (pminkows-customer-function), the target resource group (spring-cloud-serverless), the region (eastus), the service plan (EastUSLinuxDynamicPlan), and the location of the host.json file. We also need to set the connection string to the Azure Event Hubs inside the EVENT_HUBS_CONNECTION_STRING app property. So before running the build, you should export the value of that environment variable.

<plugin>
  <groupId>com.microsoft.azure</groupId>
  <artifactId>azure-functions-maven-plugin</artifactId>
  <version>1.30.0</version>
  <configuration>
    <appName>pminkows-customer-function</appName>
    <resourceGroup>spring-cloud-serverless</resourceGroup>
    <region>eastus</region>
    <appServicePlanName>EastUSLinuxDynamicPlan</appServicePlanName>
    <hostJson>${project.basedir}/src/main/resources/host.json</hostJson>
    <runtime>
      <os>linux</os>
      <javaVersion>17</javaVersion>
    </runtime>
    <appSettings>
      <property>
        <name>FUNCTIONS_EXTENSION_VERSION</name>
        <value>~4</value>
      </property>
      <property>
        <name>EVENT_HUBS_CONNECTION_STRING</name>
        <value>${EVENT_HUBS_CONNECTION_STRING}</value>
      </property>
    </appSettings>
  </configuration>
  <executions>
    <execution>
      <id>package-functions</id>
      <goals>
        <goal>package</goal>
      </goals>
    </execution>
  </executions>
</plugin>

Let’s begin with the customer-function app. Before deploying the app we need to build it first with the mvn clean package command. Once you do it you can run your first function Azure with the following command:

$ mvn azure-functions:deploy

Here’s my output after running that command. As you see, the function add-customer is exposed under the http://pminkows-customer-function.azurewebsites.net/api/add-customer URL.

azure-serverless-spring-cloud-deploy-mvn

We can come back to the pminkows-customer-function Azure Function App in the portal. According to the expectations, two functions are running there. The activate-customer function is triggered by the Azure Event Hub.

azure-serverless-spring-cloud-functions

Let’s switch to the account-function directory. We will deploy it to the Azure pminkows-account-function function. Once again we need to build the app with mvn clean package command, and then deploy it using the mvn azure-functions:deploy command. Here’s the output. There are no HTTP triggers defined, but just a single function triggered by the Azure Event Hub.

Here are the details of the pminkows-account-function Function App in the Azure Portal.

Invoke Azure Functions

Finally, we can test our functions by calling the following endpoint using e.g. curl. Let’s repeat the similar command several times with different data:

$ curl https://pminkows-customer-function.azurewebsites.net/api/add-customer \ 
    -d "{\"name\":\"Test\",\"age\":33}" \
    -H "Content-Type: application/json"

After that, we should switch to the Azure Portal. Go to the pminkows-customer-function details and click the link “Invocation and more” on the add-customer function.

You will be redirected to the Azure Monitor statistics for that function. Azure Monitor displays a list of invocations with statuses.

azure-serverless-spring-cloud-functions-invocations

We can click one of the records from the invocation history to see the details.

As you probably remember, the add-customer function sends messages to Azure Event Hubs. On the other hand, we can also verify how the new-customer function in the pminkows-account-function consumes and handles those events.

azure-serverless-spring-cloud-functions-logs

Final Thoughts

This article gives you a comprehensive guide on how to build and run Spring Cloud serverless apps on Azure Functions. It explains the concept of the triggers in Azure Functions and shows the integration with Azure Event Hubs. Finally, it shows how to run such functions locally and then monitor them on Azure after deployment.

The post Serverless on Azure with Spring Cloud Function appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2024/01/19/serverless-on-azure-with-spring-cloud-function/feed/ 0 14829
Autoscaling on Kubernetes with KEDA and Kafka https://piotrminkowski.com/2022/01/18/autoscaling-on-kubernetes-with-keda-and-kafka/ https://piotrminkowski.com/2022/01/18/autoscaling-on-kubernetes-with-keda-and-kafka/#comments Tue, 18 Jan 2022 14:58:54 +0000 https://piotrminkowski.com/?p=10475 In this article, you will learn how to autoscale your application that consumes messages from the Kafka topic with KEDA. The full name that stands behind that shortcut is Kubernetes Event Driven Autoscaling. In order to explain the idea behind it, I will create two simple services. The first of them is sending events to […]

The post Autoscaling on Kubernetes with KEDA and Kafka appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to autoscale your application that consumes messages from the Kafka topic with KEDA. The full name that stands behind that shortcut is Kubernetes Event Driven Autoscaling. In order to explain the idea behind it, I will create two simple services. The first of them is sending events to the Kafka topic, while the second is receiving them. We will run both these applications on Kubernetes. To simplify the exercise, we may use Spring Cloud Stream, which offers a smart integration with Kafka.

Architecture

Before we start, let’s take a moment to understand our scenario for today. We have a single Kafka topic used by both our applications to exchange events. This topic consists of 10 partitions. There is also a single instance of the producer that sends events at regular intervals. We are going to scale down and scale up the number of pods for the consumer service. All the instances of the consumer service are assigned to the same Kafka consumer group. It means that only a single instance with the group may receive the particular event.

Each consumer instance has only a single receiving thread. Therefore, we can easily simulate an event processing time. We will sleep the main thread for 1 second. On the other hand, the producer will send events with a variable speed. Also, it will split the messages across all available partitions. Such behavior may result in consumer lag on partitions because Spring Cloud Stream commits offset only after handling a message. In our case, the value of lag depends on producer speed and the number of running consumer instances. To clarify let’s take a look at the diagram below.

keda-kafka-arch1

Our goal is very simple. We need to adjust the number of consumer instances to the traffic rate generated by the producer service. The value of offset lag can’t exceed the desired threshold. If we increase the traffic rate on the producer side KEDA should scale up the number of consumer instances. Consequently, if we decrease the producer traffic rate it should scale down the number of consumer instances. Here’s the diagram with our scenario.

keda-kafka-arch2

Use Kafka with Spring Cloud Stream

In order to use Spring Cloud Stream for Kafka, we just need to include a single dependency in Maven pom.xml:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

After that, we can use a standard Spring Cloud Stream model. However, in the background, it integrates with Kafka through a particular binder implementation. I will not explain the details, but if you are interested please read the following article. It explains the basics at the example of RabbitMQ.

Both our applications are very simple. The producer just generates and sends events (by default in JSON format). The only thing we need to do in the code is to declare the Supplier bean. In the background, there is a single thread that generates and sends CallmeEvent every second. Each time it only increases the id field inside the message:

@SpringBootApplication
public class ProducerApp {

   private static int id = 0;

   public static void main(String[] args) {
      SpringApplication.run(ProducerApp.class, args);
   }

   @Bean
   public Supplier<CallmeEvent> eventSupplier() {
      return () -> new CallmeEvent(++id, "Hello" + id, "PING");
   }

}

We can change a default fixed delay between the Supplier ticks with the following property. Let’s say we want to send an event every 100 ms:

spring.cloud.stream.poller.fixedDelay = 100

We should also provide basic configuration like the Kafka address, topic name (if different than the name of the Supplier function), number of partitions, and a partition key. Spring Cloud Stream automatically creates topics on application startup.

spring.cloud.stream.bindings.eventSupplier-out-0.destination = test-topic
spring.cloud.stream.bindings.eventSupplier-out-0.producer.partitionKeyExpression = payload.id
spring.cloud.stream.bindings.eventSupplier-out-0.producer.partitionCount = 10
spring.kafka.bootstrap-servers = one-node-cluster.redpanda:9092

Now, the consumer application. It is also not very complicated. As I mentioned before, we will sleep the main thread inside the receiving method in order to simulate processing time.

public class ConsumerApp {

   private static final Logger LOG = LoggerFactory.getLogger(ConsumerAApp.class);

   public static void main(String[] args) {
      SpringApplication.run(ConsumerApp.class, args);
   }

   @Bean
   public Consumer<Message<CallmeEvent>> eventConsumer() {
      return event -> {
         LOG.info("Received: {}", event.getPayload());
         try {
            Thread.sleep(1000);
         } catch (InterruptedException e) { }
      };
   }

}

Finally, the configuration on the consumer side. It is important to set the consumer group and enable partitioning.

spring.cloud.stream.bindings.eventConsumer-in-0.destination = test-topic
spring.cloud.stream.bindings.eventConsumer-in-0.group = a
spring.cloud.stream.bindings.eventConsumer-in-0.consumer.partitioned = true
spring.kafka.bootstrap-servers = one-node-cluster.redpanda:9092

Now, we should deploy both applications on Kubernetes. But before we do that, let’s install Kafka and KEDA on Kubernetes.

Install Kafka on Kubernetes

To perform this part you need to install helm. Instead of Kafka directly, we can install Redpanda. It is a Kafka API compatible platform. However, the Redpanda operator requires cert-manager to create certificates for TLS communication. So, let’s install it first. We use the latest version of the cert-manager. It requires adding CRDs:

$ kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.6.1/cert-manager.crds.yaml

Then you need to add a new Helm repository:

$ helm repo add jetstack https://charts.jetstack.io

And finally, install it cert-manager:

$ helm install cert-manager \
   --namespace cert-manager \
   --version v1.6.1 \
   jetstack/cert-manager

Now, we can proceed to the Redpanda installation. The same as before, let’s add the Helm repository:

$ helm repo add redpanda https://charts.vectorized.io/
$ helm repo update

We can obtain the latest version of Redpanda:

$ export VERSION=$(curl -s https://api.github.com/repos/vectorizedio/redpanda/releases/latest | jq -r .tag_name)

Then, let’s add the CRDs:

$ kubectl apply -f https://github.com/vectorizedio/redpanda/src/go/k8s/config/crd

After that, we can finally install the Redpanda operator:

$ helm install \
   redpanda-operator \
   redpanda/redpanda-operator \
   --namespace redpanda-system \
   --create-namespace \
   --version $VERSION

We will install a single-node cluster in the redpanda namespace. To do that we need to apply the following manifest:

apiVersion: redpanda.vectorized.io/v1alpha1
kind: Cluster
metadata:
  name: one-node-cluster
spec:
  image: "vectorized/redpanda"
  version: "latest"
  replicas: 1
  resources:
    requests:
      cpu: 1
      memory: 1.2Gi
    limits:
      cpu: 1
      memory: 1.2Gi
  configuration:
    rpcServer:
      port: 33145
    kafkaApi:
    - port: 9092
    pandaproxyApi:
    - port: 8082
    adminApi:
    - port: 9644
    developerMode: true

Once you did that, you can verify a list of pods in the redpanda namespace:

$ kubectl get pod -n redpanda                      
NAME                 READY   STATUS    RESTARTS   AGE
one-node-cluster-0   1/1     Running   0          4s

If you noticed, I have already set a Kafka bootstrap server address in the application.properties. For me, it is one-node-cluster.redpanda:9092. You can verify it using the following command:

$ kubectl get svc -n redpanda
NAME               TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)                      AGE
one-node-cluster   ClusterIP   None         <none>        9644/TCP,9092/TCP,8082/TCP   23h

Install KEDA and integrate it with Kafka

The same as before we will install KEDA on Kubernetes with Helm. Let’s add the following Helm repo:

$ helm repo add kedacore https://kedacore.github.io/charts

Don’t forget to update the repository. We will install the operator in the keda namespace. Let’s create the namespace first:

$ kubectl create namespace keda

Finally, we can install the operator:

$ helm install keda kedacore/keda --namespace keda

I will run both example applications in the default namespace, so I will create a KEDA object also in this namespace. The main object responsible for configuring autoscaling with KEDA is ScaledObject. Here’s the definition:

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: consumer-scaled
spec:
  scaleTargetRef:
    name: consumer-deployment # (1)
  cooldownPeriod: 30 # (2)
  maxReplicaCount:  10 # (3)
  advanced:
    horizontalPodAutoscalerConfig: # (4)
      behavior:
        scaleDown:
          stabilizationWindowSeconds: 30
          policies:
            - type: Percent
              value: 50
              periodSeconds: 30
  triggers: # (5)
    - type: kafka
      metadata:
        bootstrapServers: one-node-cluster.redpanda:9092
        consumerGroup: a
        topic: test-topic
        lagThreshold: '5'

Let’s analyze the configuration in the details:

(1) We are setting autoscaler for the consumer application, which is deployed under the consumer-deployment name (see the next section for the Deployment manifest)

(2) We decrease the default value of the cooldownPeriod parameter from 300 seconds to 30 in order to test the scale-to-zero mechanism

(3) The maximum number of running pods is 10 (the same as the number of partitions in the topic) instead of the default 100

(4) We can customize the behavior of the Kubernetes HPA. Let’s do that for the scale-down operation. We could as well configure that for the scale-up operation. We allow to scale down 50% of current running replicas.

(5) The last and the most important part – a trigger configuration. We should set the address of the Kafka cluster, the name of the topic, and the consumer group used by our application. The lag threshold is 10. It sets the average target value of offset lags to trigger scaling operations.

Before applying the manifest containing ScaledObject we need to deploy the consumer application. Let’s proceed to the next section.

Test Autoscaling with KEDA and Kafka

Let’s deploy the consumer application first. It is prepared to be deployed with Skaffold, so you can just run the command skaffold dev from the consumer directory. Anyway, here’s the Deployment manifest:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer-deployment
spec:
  selector:
    matchLabels:
      app: consumer
  template:
    metadata:
      labels:
        app: consumer
    spec:
      containers:
      - name: consumer
        image: piomin/consumer
        ports:
        - containerPort: 8080

Once we created it, we can also apply the KEDA ScaledObject. After creating let’s display its status with the kubectl get so command.

keda-kafka-scaledobject

Ok, but… it is inactive. If you think about that’s logical since there are no incoming events in Kafka’s topic. Right? So, KEDA has performed a scale-to-zero operation as shown below:

Now, let’s deploy the producer application. For now, DO NOT override the default value of the spring.cloud.stream.poller.maxMessagesPerPoll parameter. The producer will send one event per second.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
        - name: producer
          image: piomin/producer
          ports:
            - containerPort: 8080

After some time you can run the kubectl get so once again. Now the status in the ACTIVE column should be true. And there is a single instance of the consumer application (1 event per second sent by a producer and received by a consumer).

We can also verify offsets and lags for consumer groups on the topic partitions. Just run the command rpk group describe a inside the Redpanda container.

Now, we will change the traffic rate on the producer side. It will send 5 events per second instead of 1 event before. To do that we have to define the following property in the application.properties file.

spring.cloud.stream.poller.fixedDelay = 200

Before KEDA performs autoscaling we still have a single instance of the consumer application. Therefore, after some time the lag on partitions will exceed the desired threshold as shown below.

Once autoscaling occurs we can display a list of deployments. As you see, now there are 5 running pods of the consumer service.

keda-kafka-scaling

Once again let’s verify the status of our Kafka consumer group. There 5 consumers on the topic partitions.

Finally, just to check it out – let’s undeploy the producer application. What happened? The consumer-deployment has been scaled down to zero.

Final Thoughts

You can use KEDA not only with Kafka. There are a lot of other options available including databases, different message brokers, or even cron. Here’s a full list of the supported tools. In this article, I showed how to use Kafka consumer offset and lag as a criterium for autoscaling with KEDA. I tried to explain this process in the detail. Hope it helps you to understand how KEDA exactly works 🙂

The post Autoscaling on Kubernetes with KEDA and Kafka appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/01/18/autoscaling-on-kubernetes-with-keda-and-kafka/feed/ 20 10475
Kafka Streams with Spring Cloud Stream https://piotrminkowski.com/2021/11/11/kafka-streams-with-spring-cloud-stream/ https://piotrminkowski.com/2021/11/11/kafka-streams-with-spring-cloud-stream/#comments Thu, 11 Nov 2021 10:07:45 +0000 https://piotrminkowski.com/?p=10193 In this article, you will learn how to use Kafka Streams with Spring Cloud Stream. We will build a simple Spring Boot application that simulates the stock market. Based on that example, I’ll try to explain what a streaming platform is and how it differs from a traditional message broker. If you are looking for […]

The post Kafka Streams with Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams with Spring Cloud Stream. We will build a simple Spring Boot application that simulates the stock market. Based on that example, I’ll try to explain what a streaming platform is and how it differs from a traditional message broker. If you are looking for an intro to the Spring Cloud Stream project you should read my article about it. It describes how to use Spring Cloud Stream with RabbitMQ in order to build event-driven microservices.

In Spring Cloud Stream there are two binders supporting the Kafka platform. We will focus on the second of them – Apache Kafka Streams Binder. You can read more about it in Spring Cloud documentation available here.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Introduction

There are three major types in Kafka Streams – KStreamKTable and GlobalKTable. Spring Cloud Stream supports all of them. We can easily convert the stream to the table and vice-versa. To clarify, all Kafka topics are stored as a stream. The difference is: when we want to consume that topic, we can either consume it as a table or a stream. KTable takes a stream of records from a topic and reduces it down to unique entries using a key of each message.

Architecture

KStream represents an immutable stream of data where each new record is treated as INSERT. In our case, there are two incoming streams of events. Both of them represent incoming orders. These orders are generated by the order-service application. It sends buy orders to the orders.buy topic and sell orders to the orders.sell topic. The stock-service application receives and handles events from those topics. In the first step, it needs to change the key of each message from the orderId to the productId. That’s because it has to join orders from different topics related to the same product in order to execute transactions. The final transaction price is an average of sell and buy order price.

kafka-streams-spring-cloud-concept

We are building a very simplified version of the stock market platform. Each buy order contains a maximum price at which a customer is expecting to buy a product. On the other hand, each sell order contains a minimum price a customer is ready to sell his product. If the sell order price is not greater than a buy order price for a particular product we may perform a transaction.

Each order is valid for 10 seconds. After that time the stock-service application will not handle such an order since it is considered as expired. Each order an amount of product for a transaction. For example, we may sell 100 for 10 or buy 200 for 11. Therefore, an order may be fully or partially realized. The stock-service application tries to join partially realized orders to other new or partially realized orders. You can see the visualization of that process in the picture below.

kafka-streams-spring-cloud-arch

Run Apache Kafka locally

Before we jump to the implementation, we need to run a local instance of Apache Kafka. If you don’t want to install it on your laptop, the best way to run it is through Redpanda. Redpanda is a Kafka API compatible streaming platform. In comparison to Kafka, it is relatively easy to run it locally. You just need to have Docker installed. Once you installed Redpanda on your machine you need to create a cluster. Since you don’t need a large cluster during development, you can create a single-node instance using the following command:

$ rpk container start

After running, it will print the address of your node. For me, it is 127.0.0.1:50842. So, now I can display a list of created topics using the following command:

$ rpk topic list --brokers 127.0.0.1:50842

Currently, there are no topics created. We don’t need to do anything manually. Spring Cloud Stream automatically creates missing topics on the application startup. In case, you would like to remove the Redpanda instance after our exercise, you just need to run the following command:

$ rpk container purge

Perfectly! Our local instance of Kafka is running. After that, we may proceed to the development.

Send events to Kafka with Spring Cloud Stream

In order to generate and send events continuously with Spring Cloud Stream Kafka, we need to define a Supplier bean. In our case, the order-service application generates test data. Each message contains a key and a payload that is serialized to JSON. The message key is the order’s id. We have two Supplier beans since we are sending messages to the two topics. Here’s the Order event class:

@Getter
@Setter
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class Order {
   private Long id;
   private Integer customerId;
   private Integer productId;
   private int productCount;
   @JsonDeserialize(using = LocalDateTimeDeserializer.class)
   @JsonSerialize(using = LocalDateTimeSerializer.class)
   private LocalDateTime creationDate;
   private OrderType type;
   private int amount;
}

Our application uses Lombok and Jackson for messages serialization. Of course, we also need to include Spring Cloud Stream Kafka Binder. Opposite to the consumer side, the producer does not use Kafka Streams, because it is just generating and sending events.

<dependencies>
  <dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
  </dependency>
</dependencies>

We have a predefined list of orders just to test our solution. We use MessageBuilder to build a message that contains the header kafka_messageKey and the Order payload.

@SpringBootApplication
@Slf4j
public class OrderService {

   private static long orderId = 0;
   private static final Random r = new Random();

   LinkedList<Order> buyOrders = new LinkedList<>(List.of(
      new Order(++orderId, 1, 1, 100, LocalDateTime.now(), OrderType.BUY, 1000),
      new Order(++orderId, 2, 1, 200, LocalDateTime.now(), OrderType.BUY, 1050),
      new Order(++orderId, 3, 1, 100, LocalDateTime.now(), OrderType.BUY, 1030),
      new Order(++orderId, 4, 1, 200, LocalDateTime.now(), OrderType.BUY, 1050),
      new Order(++orderId, 5, 1, 200, LocalDateTime.now(), OrderType.BUY, 1000),
      new Order(++orderId, 11, 1, 100, LocalDateTime.now(), OrderType.BUY, 1050)
   ));

   LinkedList<Order> sellOrders = new LinkedList<>(List.of(
      new Order(++orderId, 6, 1, 200, LocalDateTime.now(), OrderType.SELL, 950),
      new Order(++orderId, 7, 1, 100, LocalDateTime.now(), OrderType.SELL, 1000),
      new Order(++orderId, 8, 1, 100, LocalDateTime.now(), OrderType.SELL, 1050),
      new Order(++orderId, 9, 1, 300, LocalDateTime.now(), OrderType.SELL, 1000),
      new Order(++orderId, 10, 1, 200, LocalDateTime.now(), OrderType.SELL, 1020)
   ));

   public static void main(String[] args) {
      SpringApplication.run(OrderService.class, args);
   }

   @Bean
   public Supplier<Message<Order>> orderBuySupplier() {
      return () -> {
         if (buyOrders.peek() != null) {
            Message<Order> o = MessageBuilder
                  .withPayload(buyOrders.peek())
                  .setHeader(KafkaHeaders.MESSAGE_KEY, Objects.requireNonNull(buyOrders.poll()).getId())
                  .build();
            log.info("Order: {}", o.getPayload());
            return o;
         } else {
            return null;
         }
      };
   }

   @Bean
   public Supplier<Message<Order>> orderSellSupplier() {
      return () -> {
         if (sellOrders.peek() != null) {
            Message<Order> o = MessageBuilder
                  .withPayload(sellOrders.peek())
                  .setHeader(KafkaHeaders.MESSAGE_KEY, Objects.requireNonNull(sellOrders.poll()).getId())
                  .build();
            log.info("Order: {}", o.getPayload());
            return o;
         } else {
            return null;
         }
      };
   }

}

After that, we need to provide some configuration settings inside the application.yml file. Since we use multiple binding beans (in our case Supplier beans) we have to define the property spring.cloud.stream.function.definition that contains a list of bindable functions. We need to pass the Supplier method names divided by a semicolon. In the next few lines, we are setting the name of the target topics on Kafka and the message key serializer. Of course, we also need to set the address of the Kafka broker.

spring.kafka.bootstrap-servers: ${KAFKA_URL}

spring.cloud.stream.function.definition: orderBuySupplier;orderSellSupplier

spring.cloud.stream.bindings.orderBuySupplier-out-0.destination: orders.buy
spring.cloud.stream.kafka.bindings.orderBuySupplier-out-0.producer.configuration.key.serializer: org.apache.kafka.common.serialization.LongSerializer

spring.cloud.stream.bindings.orderSellSupplier-out-0.destination: orders.sell
spring.cloud.stream.kafka.bindings.orderSellSupplier-out-0.producer.configuration.key.serializer: org.apache.kafka.common.serialization.LongSerializer

Before running the application I need to create an environment variable containing the address of the Kafka broker.

$ export KAFKA_URL=127.0.0.1:50842

Then, let’s run our Spring Cloud application using the following Maven command:

$ mvn clean spring-boot:run

Once you did that, it sent some test orders for the same product (productId=1) as shown below.

We can also verify a list of topics on our local Kafka instance. Both of them have been automatically created by the Spring Cloud Stream Kafka binder before sending messages.

Consume Kafka Streams with Spring Cloud Stream

Now, we are going to switch to the stock-service implementation. In order to process streams of events, we need to include the Spring Cloud Stream Kafka Streams binder. Also, our application would have an ORM layer for storing data, so we have to include the Spring Data JPA starter and the H2 database.

<dependencies>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-jpa</artifactId>
  </dependency>
  <dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
  </dependency>
</dependencies>

In the first step, we are going to merge both streams of orders (buy and sell), insert the Order into the database, and print the event message. You could ask – why I use the database and ORM layer here since I have Kafka KTable? Well, I need transactions with lock support in order to coordinate the status of order realization (refer to the description in the introduction – fully and partially realized orders). I will give you more details about it in the next sections.

In order to process streams, we need to declare a functional bean that takes KStream as an input parameter. If there are two sources, we have to use BiConsumer (just for consumption) or BiFunction (to consume and send events to the new target stream) beans. In that case, we are not creating a new stream of events, so we can use BiConsumer.

@Autowired
OrderLogic logic;

@Bean
public BiConsumer<KStream<Long, Order>, KStream<Long, Order>> orders() {
   return (orderBuy, orderSell) -> orderBuy
         .merge(orderSell)
         .peek((k, v) -> {
            log.info("New({}): {}", k, v);
            logic.add(v);
         });
}

After that, we need to add some configuration settings. There are two input topics, so we need to map their names. Also, if we have more than one functional bean we need to set applicationId related to the particular function. For now, it is not required, since we have only a single function. But later, we are going to add other functions for some advanced operations.

spring.cloud.stream.bindings.orders-in-0.destination: orders.buy
spring.cloud.stream.bindings.orders-in-1.destination: orders.sell
spring.cloud.stream.kafka.streams.binder.functions.orders.applicationId: orders

For now, that’s all. You can now run the instance of stock-service using the Maven command mvn spring-boot:run.

Operations on Kafka Streams

Now, we may use some more advanced operations on Kafka Streams than just merging two different streams. In fact, that’s a key logic in our application. We need to join two different order streams into a single one using the productId as a joining key. Since the producer sets orderId as a message key, we first need to invoke the selectKey method for both order.sell and orders.buy streams. In our case, joining buy and sell orders related to the same product is just a first step. Then we need to verify if the maximum price in the buy order is not greater than the minimum price in the sell order.

The next step is to verify if both these have not been realized previously, as they also may be paired with other orders in the stream. If all the conditions are met we may create a new transaction. Finally, we may change a stream key from productId to the transactionId and send it to the dedicated transactions topic.

In order to implement the scenario described above, we need to define the BiFunction bean. It takes two input KStream from orders.buy and orders.sell and creates a new KStream of transaction events sent to the output transactions topic. While joining streams it uses 10 seconds sliding window and invokes the execute method for creating a new transaction.

@Bean
public BiFunction<KStream<Long, Order>, KStream<Long, Order>, KStream<Long, Transaction>> transactions() {
   return (orderBuy, orderSell) -> orderBuy
         .selectKey((k, v) -> v.getProductId())
         .join(orderSell.selectKey((k, v) -> v.getProductId()),
               this::execute,
               JoinWindows.of(Duration.ofSeconds(10)),
               StreamJoined.with(Serdes.Integer(), 
                                 new JsonSerde<>(Order.class), 
                                 new JsonSerde<>(Order.class)))
         .filterNot((k, v) -> v == null)
         .map((k, v) -> new KeyValue<>(v.getId(), v))
         .peek((k, v) -> log.info("Done -> {}", v));
}

private Transaction execute(Order orderBuy, Order orderSell) {
   if (orderBuy.getAmount() >= orderSell.getAmount()) {
      int count = Math.min(orderBuy.getProductCount(), orderSell.getProductCount());
      boolean allowed = logic.performUpdate(orderBuy.getId(), orderSell.getId(), count);
      if (!allowed)
         return null;
      else
         return new Transaction(
            ++transactionId,
            orderBuy.getId(),
            orderSell.getId(),
            Math.min(orderBuy.getProductCount(), orderSell.getProductCount()),
            (orderBuy.getAmount() + orderSell.getAmount()) / 2,
            LocalDateTime.now(),
            "NEW");
   } else {
      return null;
   }
}

Let’s take a closer look at the performUpdate() method called inside the execute() method. It initiates a transaction and locks both Order entities. Then it verifies each order realization status and updates it with the current values if possible. Only if the performUpdate() method finishes successfully the stock-service application creates a new transaction.

@Service
public class OrderLogic {

   private OrderRepository repository;

   public OrderLogic(OrderRepository repository) {
      this.repository = repository;
   }

   public Order add(Order order) {
      return repository.save(order);
   }

   @Transactional
   public boolean performUpdate(Long buyOrderId, Long sellOrderId, int amount) {
      Order buyOrder = repository.findById(buyOrderId).orElseThrow();
      Order sellOrder = repository.findById(sellOrderId).orElseThrow();
      int buyAvailableCount = buyOrder.getProductCount() - buyOrder.getRealizedCount();
      int sellAvailableCount = sellOrder.getProductCount() - sellOrder.getRealizedCount();
      if (buyAvailableCount >= amount && sellAvailableCount >= amount) {
         buyOrder.setRealizedCount(buyOrder.getRealizedCount() + amount);
         sellOrder.setRealizedCount(sellOrder.getRealizedCount() + amount);
         repository.save(buyOrder);
         repository.save(sellOrder);
         return true;
      } else {
         return false;
      }
   }
}

Here’s our repository class with the findById method. It sets a pessimistic lock on the Order entity during the transaction.

public interface OrderRepository extends CrudRepository<Order, Long> {

  @Lock(LockModeType.PESSIMISTIC_WRITE)
  Optional<Order> findById(Long id);

}

We also need to provide configuration settings for the transaction BiFunction.

spring.cloud.stream.bindings.transactions-in-0.destination: orders.buy
spring.cloud.stream.bindings.transactions-in-1.destination: orders.sell
spring.cloud.stream.bindings.transactions-out-0.destination: transactions
spring.cloud.stream.kafka.streams.binder.functions.transactions.applicationId: transactions

spring.cloud.stream.function.definition: orders;transactions

Use Kafka KTable with Spring Cloud Stream

We have already finished the implementation of the logic responsible for creating transactions from incoming orders. Now, we would like to examine data generated by our stock-service application. The most important things are how many transactions were generated, what was the volume of transactions globally, and per product. Three key statistics related to our transactions are: the number of transactions, the number of products sell/buy during transactions, and the total amount of transactions (price * productsCount). Here’s the definition of our object used for counting aggregations.

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class TransactionTotal {
   private int count;
   private int productCount;
   private int amount;
}

In order to call an aggregation method, we first need to group orders stream by the selected key. In the method visible below we use the status field as a grouping key. After that, we may invoke an aggregate method that allows us to perform some more complex calculations. In that particular case, we are calculating the number of all executed transactions, their volume of products, and total amount. The result KTable can be materialized as the state store. Thanks to that we will be able to query it by the name all-transactions-store.

@Bean
public Consumer<KStream<Long, Transaction>> total() {
   KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
                "all-transactions-store");
   return transactions -> transactions
         .groupBy((k, v) -> v.getStatus(), 
                  Grouped.with(Serdes.String(), new JsonSerde<>(Transaction.class)))
         .aggregate(
                 TransactionTotal::new,
                 (k, v, a) -> {
                    a.setCount(a.getCount() + 1);
                    a.setProductCount(a.getProductCount() + v.getAmount());
                    a.setAmount(a.getAmount() + (v.getPrice() * v.getAmount()));
                    return a;
                 },
                 Materialized.<String, TransactionTotal> as(storeSupplier)
                    .withKeySerde(Serdes.String())
                    .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
         .toStream()
         .peek((k, v) -> log.info("Total: {}", v));
}

The next function performs a similar aggregate operation, but this time per each product. Because the Transaction object does not contain information about the product, we first need to join the order to access it. Then we produce a KTable by per productId grouping and aggregation. The same as before we are materializing aggregation as a state store.

@Bean
public BiConsumer<KStream<Long, Transaction>, KStream<Long, Order>> totalPerProduct() {
   KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
                "transactions-per-product-store");
   return (transactions, orders) -> transactions
         .selectKey((k, v) -> v.getSellOrderId())
         .join(orders.selectKey((k, v) -> v.getId()),
               (t, o) -> new TransactionTotalWithProduct(t, o.getProductId()),
               JoinWindows.of(Duration.ofSeconds(10)),
               StreamJoined.with(Serdes.Long(), 
                  new JsonSerde<>(Transaction.class), 
                  new JsonSerde<>(Order.class)))
         .groupBy((k, v) -> v.getProductId(), 
            Grouped.with(Serdes.Integer(), new JsonSerde<>(TransactionTotalWithProduct.class)))
         .aggregate(
               TransactionTotal::new,
               (k, v, a) -> {
                  a.setCount(a.getCount() + 1);
                  a.setProductCount(a.getProductCount() + v.getTransaction().getAmount());
                  a.setAmount(a.getAmount() + (v.getTransaction().getPrice() * v.getTransaction().getAmount()));
                  return a;
               },
               Materialized.<Integer, TransactionTotal> as(storeSupplier)
                  .withKeySerde(Serdes.Integer())
                  .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
         .toStream()
         .peek((k, v) -> log.info("Total per product({}): {}", k, v));
}

What if we would like to perform similar aggregations to described above, but only for a particular period of time? We need to invoke the windowedBy method and produce a dedicated state store for such operations.

@Bean
public BiConsumer<KStream<Long, Transaction>, KStream<Long, Order>> latestPerProduct() {
   WindowBytesStoreSupplier storeSupplier = Stores.persistentWindowStore(
      "latest-transactions-per-product-store", Duration.ofSeconds(30), Duration.ofSeconds(30), false);
   return (transactions, orders) -> transactions
      .selectKey((k, v) -> v.getSellOrderId())
      .join(orders.selectKey((k, v) -> v.getId()),
            (t, o) -> new TransactionTotalWithProduct(t, o.getProductId()),
            JoinWindows.of(Duration.ofSeconds(10)),
            StreamJoined.with(Serdes.Long(), new JsonSerde<>(Transaction.class), new JsonSerde<>(Order.class)))
      .groupBy((k, v) -> v.getProductId(), Grouped.with(Serdes.Integer(), new JsonSerde<>(TransactionTotalWithProduct.class)))
      .windowedBy(TimeWindows.of(Duration.ofSeconds(30)))
      .aggregate(
            TransactionTotal::new,
            (k, v, a) -> {
               a.setCount(a.getCount() + 1);
               a.setAmount(a.getAmount() + v.getTransaction().getAmount());
               return a;
            },
            Materialized.<Integer, TransactionTotal> as(storeSupplier)
               .withKeySerde(Serdes.Integer())
               .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
      .toStream()
      .peek((k, v) -> log.info("Total per product last 30s({}): {}", k, v));
}

Interactive queries

We have already created and configured all required Kafka Streams with Spring Cloud. Finally, we can execute queries on state stores. This operation is called an interactive query. Let’s create a REST controller for exposing such endpoints with the results. In order to query Kafka Streams state stores with Spring Cloud, we need to inject the InteractiveQueryService bean into the controller.

@RestController
@RequestMapping("/transactions")
public class TransactionController {

   private InteractiveQueryService queryService;

   public TransactionController(InteractiveQueryService queryService) {
      this.queryService = queryService;
   }

   @GetMapping("/all")
   public TransactionTotal getAllTransactionsSummary() {
      ReadOnlyKeyValueStore<String, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("all-transactions-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get("NEW");
   }

   @GetMapping("/product/{productId}")
   public TransactionTotal getSummaryByProductId(@PathVariable("productId") Integer productId) {
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get(productId);
   }

   @GetMapping("/product/latest/{productId}")
   public TransactionTotal getLatestSummaryByProductId(@PathVariable("productId") Integer productId) {
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("latest-transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get(productId);
   }

   @GetMapping("/product")
   public Map<Integer, TransactionTotal> getSummaryByAllProducts() {
      Map<Integer, TransactionTotal> m = new HashMap<>();
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      KeyValueIterator<Integer, TransactionTotal> it = keyValueStore.all();
      while (it.hasNext()) {
         KeyValue<Integer, TransactionTotal> kv = it.next();
         m.put(kv.key, kv.value);
      }
      return m;
   }

}

Before you run the latest version of the stock-service application you should generate more differentiated random data. Let’s say we would like to generate orders for 5 different products with floating prices as shown below. Just uncomment the following fragment of code in the order-service and run the application once again to generate an infinitive stream of events.

private static long orderId = 0;
private static final Random r = new Random();

private Map<Integer, Integer> prices = Map.of(
      1, 1000, 
      2, 2000, 
      3, 5000, 
      4, 1500, 
      5, 2500);

@Bean
public Supplier<Message<Order>> orderBuySupplier() {
   return () -> {
      Integer productId = r.nextInt(1, 6);
      int price = prices.get(productId) + r.nextInt(-100, 100);
      Order o = new Order(
         ++orderId,
         r.nextInt(1, 6),
         productId,
         100,
         LocalDateTime.now(),
         OrderType.BUY,
         price);
      log.info("Order: {}", o);
      return MessageBuilder
         .withPayload(o)
         .setHeader(KafkaHeaders.MESSAGE_KEY, orderId)
         .build();
   };
}

You may also want to generate more messages. To do that you need to decrease timeout for Spring Cloud Stream Kafka Supplier.

spring.cloud.stream.poller.fixedDelay: 100

After running both our sample applications you may verify the logs on the stock-service side.

Then you may call our REST endpoints performing interactive queries on the materialized Kafka KTable.

$ curl http://localhost:8080/transactions/all
$ curl http://localhost:8080/transactions/product/3
$ curl http://localhost:8080/transactions/product/latest/5

It looks simple? Well, under the hood it may look quite more complicated 🙂 Here’s a final list of topics automatically created to the needs of our application.

kafka-streams-spring-cloud-topics

Final Thoughts

Spring Cloud Stream simplifies working with Kafka Streams and interactive queries. Kafka Streams by itself is a very powerful mechanism. In this article, I showed you how we can use it to implement not very trivial logic and then analyze data in various ways.

The post Kafka Streams with Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/11/11/kafka-streams-with-spring-cloud-stream/feed/ 21 10193
Introduction to event-driven microservices with Spring Cloud Stream https://piotrminkowski.com/2020/06/05/introduction-to-event-driven-microservices-with-spring-cloud-stream/ https://piotrminkowski.com/2020/06/05/introduction-to-event-driven-microservices-with-spring-cloud-stream/#respond Fri, 05 Jun 2020 08:42:06 +0000 http://piotrminkowski.com/?p=7936 Spring Cloud Stream framework allows us to easily include well-known Spring patterns and best practices to applications while implementing event-driven microservices architecture. It uses the Spring Integration project to provide connectivity to a message broker. It provides built-in support for such features as a persistent publish-subscribe model, consumer grouping, and partitioning. The integration with a […]

The post Introduction to event-driven microservices with Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
Spring Cloud Stream framework allows us to easily include well-known Spring patterns and best practices to applications while implementing event-driven microservices architecture. It uses the Spring Integration project to provide connectivity to a message broker. It provides built-in support for such features as a persistent publish-subscribe model, consumer grouping, and partitioning. The integration with a specific message broker solution is realized by binder implementations that are hidden behind the middleware-neutral core.
The currently described version of Spring Cloud Stream is 3.0.3.RELEASE within Spring Cloud Release Train Hoxton.SR3.

For a more detailed introduction to a process of building Spring Cloud Stream microservices architecture with you can refer to my video course: Microservices With Spring Boot And Spring Cloud: Part 5 – Event-driven microservices.

Example of Spring Cloud Stream microservices

Our sample system consists of three microservices producer-service, consumer-a-service, and consumer-b-service. Each of them is connecting with the RabbitMQ broker. The application producer-service is responsible for sending events to a message broker, while two other microservices are listening for the incoming events. Communication between applications follows a publish-subscribe model, where data is broadcast through shared topics. Our consumer application is enabling such mechanisms like consumer grouping to guarantee that only a single instance of the application is handling the same event, and partitioning for assigning events to the selected instance of application basing on routing key set on the producer side. The following picture is visualizing a currently described architecture.

spring-cloud-stream-arch

The source code of sample applications is as usual available on GitHub. You may find it in the repository https://github.com/piomin/course-spring-microservices.git. That repository also contains code snippets of applications used in previous parts of my video course, so you should go to directory event-driven to access the right samples.

Dependencies to Spring Cloud Stream

To enable Spring Cloud Stream for our application we first need to include the right binder implementation library. Because we are integrating with RabbitMQ we have to use spring-cloud-stream-binder-rabbit artifact. It is referencing to all other required libraries, so we don’t have to include any other dependencies.

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

Messaging between Spring Cloud Stream microservices

Each message that is sent to the message broker is automatically serialized to JSON. Here’s a class that is the object representation of JSON payload exchanged by the applications.

data class CallmeEvent(val id: Int = 0, val message: String = "")

Producer application

Here’s the implementation of Supplier bean responsible for generating a stream of events continuously. By default it is sending CallmeEvent once a second. It is incrementing id field on every new event and set header to_process to true. Basing on the value of this header we are routing messages on the consumer side. The mechanism may be easily disabled by setting property callme.supplier.enabled to false.

@SpringBootApplication
class ProductionApplication {

    var id: Int = 0

    @Value("\${callme.supplier.enabled}")
    val supplierEnabled: Boolean = false

    @Bean
    fun callmeEventSupplier(): Supplier<Message<CallmeEvent>?> = Supplier { createEvent() }

    private fun createEvent(): Message<CallmeEvent>? {
        return if (supplierEnabled)
            MessageBuilder.withPayload(CallmeEvent(++id, "I'm callme event!"))
                    .setHeader("to_process", true)
                    .build()
        else
            null
    }
}

Alternatively we may send an event on demand, for example by calling REST endpoint. To do that we need to use StreamBridge bean. It provides send method that takes a name of binding and message object as parameters. We may set just a payload object or use MessageBuilder to create the whole GenericMessage with headers. Our controller is exposing two POST methods. First of them POST /{message} is used just for setting message body. The second method POST /{message}/process/{process} allows set header to_process.

@RestController
@RequestMapping("/producer")
class ProducerController(val streamBridge: StreamBridge) {

    var id: Int = 0

    @PostMapping("/{message}")
    fun sendEvent(@PathVariable message: String): Boolean {
        return streamBridge.send("callmeEventSupplier-out-0", CallmeEvent(++id, message))
    }

    @PostMapping("/{message}/process/{process}")
    fun sendEventWithHeader(@PathVariable message: String, @PathVariable process: Boolean): Boolean {
        return streamBridge.send("callmeEventSupplier-out-0",
                MessageBuilder.createMessage(CallmeEvent(++id, message),
                        MessageHeaders(mutableMapOf(Pair<String, Any>("to_process", process)))))
    }
}

Here is a configuration file of our application. We need a default name of the destination for our binding (1). By default, it is the same as a binding name, but we are going to change it to callme-events. The same destination will be set on the consumer side. If we are using StreamBridge for sending messages we also need to set property spring.cloud.stream.source with the name that is used as a prefix of the generated binding name. If you would like to use the same output as for Supplier you should set the same name as for method that registers Supplier bean – callmeEventSupplier. We should also increase a level of logging for Spring AMQP library to see the structure of messages sent to message broker (3).

spring.application.name=producer-service
spring.cloud.stream.bindings.callmeEventSupplier-out-0.destination=callme-events #(1)
spring.cloud.stream.source=callmeEventSupplier #(2)
logging.level.org.springframework.amqp=DEBUG #(3)
callme.supplier.enabled=true

Consumer application

There are two different applications that listen for incoming events. Let’s start from implementation of consumer-a-service. It is pretty simple, because it is just logging message that has been consumed from callme-events topic. To consume message from destination we need to define Consumer bean that takes CallmeEvent as an argument.

@SpringBootApplication
class ConsumerAApplication {

    val logger: Logger = LoggerFactory.getLogger(ConsumerAApplication::class.java)

    @Bean
    fun callmeEventConsumer(): Consumer<CallmeEvent> = Consumer { logger.info("Received: {}", it) }
}

In the application properties we also need to override a default name of destination for callmeEventConsumer-in-0 binding. That name is the same as name of output destination configured on the producer side – callme-events. We are also setting a consumer group for all instances of consumer-a-service application. The name of group is a. Consumer group guarantees that only a single instance of application in a group is handling a single incoming event.

spring.application.name=consumer-a-service
spring.cloud.stream.bindings.callmeEventConsumer-in-0.destination=callme-events
spring.cloud.stream.bindings.callmeEventConsumer-in-0.group=a

Message broker

We are running RabbitMQ on Docker container. We need to expose two ports outside container: 5672 for TCP connections from applications and 15672 for web management console. The following command starts the container.

$ docker run -d --name rabbit -h rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Now, we may run both our applications producer-service and consumer-a-service. We are going to run a single instance of producer-service and two instances of consumer-a-service. After that producer-service is starting to send events continuously to the destination on message broker. We may verify it with Rabbit Management Console. To login use default guest / guest credentials.

rabbit-list

Routing function

We can provide some more advanced routing on the consumer side. In Spring Cloud Stream nomenclature event routing is the ability to either route evens to a particular even subscriber or route event produced by an event subscriber to a particular destination. Event routing may be enabled for application by setting property spring.cloud.stream.function.routing.enabled to true. After that the generated name of bindings is automatically set to functionRouter-*.
I enabled event routing feature for consumer-b-service. Here’s the list of configuration properties required to enable routing for consumer listening on callme-events destination. When enabling event routing we also need to set a routing expression. We may use SPeL notation for it. In that case I’m performing routing basing on to_process header value.

spring.application.name=consumer-b-service
spring.cloud.stream.bindings.functionRouter-in-0.destination=callme-events
spring.cloud.stream.bindings.functionRouter-in-0.group=b
spring.cloud.stream.function.routing.enabled=true
spring.cloud.function.routing-expression=(headers['to_process']!=null && headers['to_process']==true) ? 'process':'fireForget'

The name of declared Consumer or Function beans should be the same as the values returned by spring.cloud.function.routing-expression. In that case these are fireForget() and process().

@SpringBootApplication
class ConsumerBApplication {

    val logger: Logger = LoggerFactory.getLogger(ConsumerBApplication::class.java)

    @Bean
    fun fireForget(): Consumer<CallmeEvent> = Consumer { logger.info("Received(fireForget): {}", it) }

    @Bean
    fun process(): Function<CallmeEvent, CallmeResponse> = Function { logAndResponse(it) }

    private fun logAndResponse(callmeEvent: CallmeEvent): CallmeResponse {
        logger.info("Received(process): {}", callmeEvent)
        return CallmeResponse(callmeEvent.id, "I'm callme response")
    }
}

Here’s the list of exchanges used by our system after starting consumer-b-service. Since the name of destination for functionRouter-in-0 is overridden in configuration properties, the name of destination for functionRouter-out-0 is left at its default value.

spring-cloud-stream-routing

Partitioning

Partitioning is a critical concept in stateful processing, where it is required to ensure that all related data is handled together. So, thanks to partitioning we may implement an algorithm responsible for distributing messages across multiple instances of applications in a determined way.
To enable partitioning on the producer side we need to set two properties related to a given binding. It is producer’s partitionKeyExpression, which in that case the id field of CallmeEvent, and producer’s partitionCount with a number of partitions. This number should be the same as a number of running instances of the application, since each partition is assigned to the selected instance of that application. Because we are planning to run two instances, we are setting such a value.

spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionCount=2

To enable partitioning on the consumer side we need to enable a single property on a given binding. We
also need to set a property spring.cloud.stream.instanceCount using the static number that represents a number of deployed instances of an application. While Kafka supports a more flexible way of partitioning configuration, RabbitMQ requires a static value, so we can’t scale up numbers dynamically without changing these properties. We also need to set property spring.cloud.stream.instanceIndex for a single instance. It needs to 0 or 1 if we have two running instances as shown below.

spring-cloud-stream-intellij-start

Now we take a look on structure of routing inside callme-events exchange created on RabbitMQ. Two consumers are listening for incoming events per a single consumer group. So if we have to different consumer groups defined there 4 consumers in total. Within each group, we have two different routing keys set for each target queue.

partitioning-rabbitmq

We can also verify that four queues receive incoming events.

spring-cloud-stream-partitioning-queues

Testing

Spring Cloud Stream provides an easy way of testing your microservice applications without need to connecting to an external messaging system. To enable that support we need to include the following dependency to Maven pom.xml.

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream</artifactId>
   <version>3.0.3.RELEASE</version>
   <type>test-jar</type>
   <scope>test</scope>
   <classifier>test-binder</classifier>
</dependency>

In JUnit test class we should import TestChannelBinderConfiguration class. after that we may use InputDestination for sending test messages, and OutputDestination for receiving them during a test. The following testRouter is created for consumer-b-service, and it verifies that RouterFunction is working properly by receiving and validating message sent to an output destination.

@SpringBootTest(classes = [ConsumerBApplication::class])
@Import(TestChannelBinderConfiguration::class)
class RouterFunctionTest {

    @Autowired
    lateinit var inputDestination: InputDestination
    @Autowired
    lateinit var outputDestination: OutputDestination

    @Test
    fun testRouter() {
        inputDestination.send(MessageBuilder.withPayload(CallmeEvent(1, "I'm callme event"))
                .setHeader("to_process", true)
                .build())
        val response = outputDestination.receive()
        Assertions.assertNotNull(response)
        Assertions.assertTrue(response.payload.isNotEmpty())
        val payload = String(response.payload)
        val payloadObject = ObjectMapper().readValue(payload, CallmeResponse::class.java)
        Assertions.assertEquals(1, payloadObject.id)
        Assertions.assertEquals("I'm callme response", payloadObject.message)
    }

}

The post Introduction to event-driven microservices with Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2020/06/05/introduction-to-event-driven-microservices-with-spring-cloud-stream/feed/ 0 7936
Building and testing message-driven microservices using Spring Cloud Stream https://piotrminkowski.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/ https://piotrminkowski.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/#comments Fri, 15 Jun 2018 08:46:04 +0000 https://piotrminkowski.wordpress.com/?p=6678 In this article, you will learn how to improve automated testing of message-driven microservices with Spring Cloud Stream. Spring Boot and Spring Cloud give you a great opportunity to build microservices fast using different styles of communication. You can create synchronous REST microservices based on Spring Cloud Netflix libraries as shown in one of my […]

The post Building and testing message-driven microservices using Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to improve automated testing of message-driven microservices with Spring Cloud Stream. Spring Boot and Spring Cloud give you a great opportunity to build microservices fast using different styles of communication. You can create synchronous REST microservices based on Spring Cloud Netflix libraries as shown in one of my previous articles Quick Guide to Microservices with Spring Boot 2.0, Eureka and Spring Cloud. You can create asynchronous, reactive microservices deployed on Netty with Spring WebFlux project and combine it successfully with some Spring Cloud libraries as shown in my article Reactive Microservices with Spring WebFlux and Spring Cloud. And finally, you may implement message-driven microservices based on the publish/subscribe model using Spring Cloud Stream and a message broker like Apache Kafka or RabbitMQ. The last of the listed approaches to building microservices is the main subject of this article. I’m going to show you how to effectively build, scale, run, and test messaging microservices based on RabbitMQ broker.

Architecture

For the purpose of demonstrating Spring Cloud Stream testing features we will design a sample system that uses publish/subscribe model for inter-service communication. We have three microservices: order-service, product-service and account-service. Application order-service exposes HTTP endpoint that is responsible for processing orders sent to our system. All the incoming orders are processed asynchronously – order-service prepare and send messages to RabbitMQ exchange and then respond to the calling client that the request has been accepted for processing. Applications account-service and product-service are listening for the order messages incoming to the exchange. Microservice account-service is responsible for checking if there are sufficient funds on a customer’s account for order realization and then withdrawing cash from this account. Microservice product-service checks if there is a sufficient amount of products in the store, and changes the number of available products after processing the order. Both account-service and product-service send asynchronous response through RabbitMQ exchange (this time it is one-to-one communication using direct exchange) with a status of operation. Microservice order-service after receiving response messages sets the appropriate status of the order and exposes it through REST endpoint GET /order/{id} to the external client.

If you feel that the description of our sample system is a little incomprehensible, here’s the diagram with architecture for clarification.

stream-1

Enabling Spring Cloud Stream

The recommended way to include Spring Cloud Stream in the project is with a dependency management system. Spring Cloud Stream has an independent release train management in relation to the whole Spring Cloud framework. However, if we have declared spring-cloud-dependencies in the Elmhurst.RELEASE version inside the dependencyManagement
section, we wouldn’t have to declare anything else in pom.xml. If you prefer to use only the Spring Cloud Stream project, you should define the following section.

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-dependencies</artifactId>
      <version>Elmhurst.RELEASE</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

The next step is to add spring-cloud-stream artifact to the project dependencies. I also recommend you include at least the spring-cloud-sleuth library to provide sending messaging with the same traceId as the source request incoming to order-service.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-sleuth</artifactId>
</dependency>

Spring Cloud Stream programming model

To enable connectivity to a message broker for your application, annotate the main class with @EnableBinding. The @EnableBinding annotation takes one or more interfaces as parameters. You may choose between three interfaces provided by Spring Cloud Stream:

  • Sink: This is used for marking a service that receives messages from the inbound channel.
  • Source: This is used for sending messages to the outbound channel.
  • Processor: This can be used in case you need both an inbound channel and an outbound channel, as it extends the Source and Sink interfaces. Because order-service sends messages, as well as receives them, its main class has been annotated with @EnableBinding(Processor.class).

Here’s the main class of order-service that enables Spring Cloud Stream binding.

@SpringBootApplication
@EnableBinding(Processor.class)
public class OrderApplication {
  public static void main(String[] args) {
    new SpringApplicationBuilder(OrderApplication.class).web(true).run(args);
  }
}

Adding message broker

In Spring Cloud Stream nomenclature the implementation responsible for integration with the specific message broker is called binder. By default, Spring Cloud Stream provides binder implementations for Kafka and RabbitMQ. It is able to automatically detect and use a binder found on the classpath. Any middleware-specific settings can be overridden through external configuration properties in the form supported by Spring Boot, such as application arguments, environment variables, or just the application.yml file. To include support for RabbitMQ you should add the following dependency to the project.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

Now, our applications need to connect with one, shared instance of RabbitMQ broker. That’s why I run Docker image with RabbitMQ exposed outside on default 5672 port. It also launches a web dashboard available under address http://192.168.99.100:15672.

$ docker run -d --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

We need to override the default address of RabbitMQ for every Spring Boot application by setting property spring.rabbitmq.host to Docker machine IP 192.168.99.100.

spring:  
  rabbitmq:
    host: 192.168.99.100
    port: 5672

Implementing message-driven microservices

Spring Cloud Stream is built on top of the Spring Integration project. Spring Integration extends the Spring programming model to support the well-known Enterprise Integration Patterns (EIP). EIP defines a number of components that are typically used for orchestration in distributed systems. You have probably heard about patterns such as message channels, routers, aggregators, or endpoints. Let’s proceed to the implementation.
We begin from order-service, which is responsible for accepting orders, publishing them on shared topics, and then collecting asynchronous responses from downstream services. Here’s the @Service, which builds a message and publishes it to the remote topic using Source bean.

@Service
public class OrderSender {
  @Autowired
  private Source source;
  
  public boolean send(Order order) {
    return this.source.output().send(MessageBuilder.withPayload(order).build());
  }
}

That @Service is called by the controller, which exposes the HTTP endpoints for submitting new orders and getting order with status by id.

@RestController
public class OrderController {

   private static final Logger LOGGER = LoggerFactory.getLogger(OrderController.class);
   
   private ObjectMapper mapper = new ObjectMapper();
   
   @Autowired
   OrderRepository repository;
   @Autowired
   OrderSender sender;   
   
   @PostMapping
   public Order process(@RequestBody Order order) throws JsonProcessingException {
      Order o = repository.add(order);
      LOGGER.info("Order saved: {}", mapper.writeValueAsString(order));
      boolean isSent = sender.send(o);
      LOGGER.info("Order sent: {}", mapper.writeValueAsString(Collections.singletonMap("isSent", isSent)));
      return o;
   }
   
   @GetMapping("/{id}")
   public Order findById(@PathVariable("id") Long id) {
      return repository.findById(id);
   }
   
}

Now, let’s take a closer look at the consumer side. The message sent by OrderSender bean from order-service is received by account-service and product-service. To receive the message from topic exchange, we just have to annotate the method that takes the Order object as a parameter with @StreamListener. We also have to define a target channel for the listener – in that case it is Processor.INPUT.

@SpringBootApplication
@EnableBinding(Processor.class)
public class OrderApplication {
   
   private static final Logger LOGGER = LoggerFactory.getLogger(OrderApplication.class);
   
   @Autowired
   OrderService service;
   
   public static void main(String[] args) {
      new SpringApplicationBuilder(OrderApplication.class).web(true).run(args);
   }
   
   @StreamListener(Processor.INPUT)
   public void receiveOrder(Order order) throws JsonProcessingException {
      LOGGER.info("Order received: {}", mapper.writeValueAsString(order));
      service.process(order);
   }
   
}

Received order is then processed by AccountService bean. Order may be accepted or rejected by account-service dependending on sufficient funds on customer’s account for order’s realization. The response with acceptance status is sent back to order-service via output channel invoked by the OrderSender bean.

@Service
public class AccountService {

   private static final Logger LOGGER = LoggerFactory.getLogger(AccountService.class);
   
   private ObjectMapper mapper = new ObjectMapper();
   
   @Autowired
   AccountRepository accountRepository;
   @Autowired
   OrderSender orderSender;
   
   public void process(final Order order) throws JsonProcessingException {
      LOGGER.info("Order processed: {}", mapper.writeValueAsString(order));
      List accounts =  accountRepository.findByCustomer(order.getCustomerId());
      Account account = accounts.get(0);
      LOGGER.info("Account found: {}", mapper.writeValueAsString(account));
      if (order.getPrice() <= account.getBalance()) {
         order.setStatus(OrderStatus.ACCEPTED);
         account.setBalance(account.getBalance() - order.getPrice());
      } else {
         order.setStatus(OrderStatus.REJECTED);
      }
      orderSender.send(order);
      LOGGER.info("Order response sent: {}", mapper.writeValueAsString(order));
   }
   
}

The last step is configuration. It is provided inside application.yml file. We have to properly define destinations for channels. While order-service is assigning orders-out destination to output channel, and orders-in destination to input channel, account-service and product-service do the opposite. It is logical, because messages sent by order-service via its output destination are received by consuming services via their input destinations. But it is still the same destination on a shared broker's exchange. Here are configuration settings of order-service.

spring: 
  cloud:  
    stream:
      bindings:
        output:
          destination: orders-out
        input:
          destination: orders-in
      rabbit:
        bindings:
          input:
            consumer:
              exchangeType: direct

Here's configuration provided for account-service and product-service.

spring:  
  cloud:  
    stream:
      bindings:
        output:
          destination: orders-in
        input:
          destination: orders-out
      rabbit:
        bindings:
          output:
            producer:
              exchangeType: direct
              routingKeyExpression: '"#"'

Finally, you can run our sample microservice. For now, we just need to run a single instance of each microservice. You can easily generate some test requests by running JUnit test class OrderControllerTest provided in my source code repository inside module order-service. This case is simple. In the next, we will study more advanced samples with multiple running instances of consuming services.

Scaling up

To scale up our Spring Cloud Stream applications we just need to launch additional instances of each microservice. They will still listen for the incoming messages on the same topic exchange as the currently running instances. After adding one instance of account-service and product-service we may send a test order. The result of that test won't be satisfactory for us... Why? A single order is received by all the running instances of every microservice. This is exactly how topic exchanges work - the message sent to the topic is received by all consumers, which are listening on that topic. Fortunately, Spring Cloud Stream is able to solve that problem by providing a solution called consumer group. It is responsible for guaranteeing that only one of the instances is expected to handle a given message if they are placed in a competing consumer relationship. The transformation to consumer group mechanism when running multiple instances of the service has been visualized on the following figure.

stream-2

Configuration of a consumer group mechanism is not very difficult. We just have to set a group parameter with the name of the group for the given destination. Here's the current binding configuration for account-service. The orders-in destination is a queue created for direct communication with order-service, so only orders-out is grouped using spring.cloud.stream.bindings..group property.

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: orders-in
        input:
          destination: orders-out
          group: account

Consumer group mechanisms is a concept taken from Apache Kafka, and implemented in Spring Cloud Stream also for RabbitMQ broker, which does not natively support it. So, I think it is pretty interesting how it is configured on RabbitMQ. If you run two instances of the service without setting group name on destination there are two bindings created for a single exchange (one binding per one instance) as shown in the picture below. Because two applications are listening on that exchange, there are four bindings assigned to that exchange in total.

spring-cloud-stream-testing-3

If you set a group name for the selected destination Spring Cloud Stream will create a single binding for all running instances of a given service. The name of the binding will be suffixed with the group name.

spring-cloud-stream-testing_11_06

Because, we have included spring-cloud-starter-sleuth to the project dependencies the same traceId header is sent between all the asynchronous requests exchanged during realization of single request incoming to the order-service POST endpoint. Thanks to that we can easily correlate all logs using this header using Elastic Stack (Kibana).

spring-cloud-stream-testing_11_05

Automated Testing with Spring Cloud Stream

You can easily test your microservice without connecting to a message broker. To achieve it you need to include spring-cloud-stream-test-support to your project dependencies. It contains the TestSupportBinder bean that lets you interact with the bound channels and inspect any messages sent and received by the application.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-test-support</artifactId>
  <scope>test</scope>
</dependency>

In the test class we need to declare MessageCollector bean, which is responsible for receiving messages retained by TestSupportBinder. Here's my test class from account-service. Using Processor bean I send test order to input channel. Then MessageCollector receives a message that is sent back to order-service via the output channel. Test method testAccepted creates order that should be accepted by account-service, while testRejected method sets too high an order price that results in rejecting the order.

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class OrderReceiverTest {

   private static final Logger LOGGER = LoggerFactory.getLogger(OrderReceiverTest.class);
   
   @Autowired
   private Processor processor;
   @Autowired
   private MessageCollector messageCollector;

   @Test
   @SuppressWarnings("unchecked")
   public void testAccepted() {
      Order o = new Order();
      o.setId(1L);
      o.setAccountId(1L);
      o.setCustomerId(1L);
      o.setPrice(500);
      o.setProductIds(Collections.singletonList(2L));
      processor.input().send(MessageBuilder.withPayload(o).build());
      Message received = (Message) messageCollector.forChannel(processor.output()).poll();
      LOGGER.info("Order response received: {}", received.getPayload());
      assertNotNull(received.getPayload());
      assertEquals(OrderStatus.ACCEPTED, received.getPayload().getStatus());
   }
   
   @Test
   @SuppressWarnings("unchecked")
   public void testRejected() {
      Order o = new Order();
      o.setId(1L);
      o.setAccountId(1L);
      o.setCustomerId(1L);
      o.setPrice(100000);
      o.setProductIds(Collections.singletonList(2L));
      processor.input().send(MessageBuilder.withPayload(o).build());
      Message received = (Message) messageCollector.forChannel(processor.output()).poll();
      LOGGER.info("Order response received: {}", received.getPayload());
      assertNotNull(received.getPayload());
      assertEquals(OrderStatus.REJECTED, received.getPayload().getStatus());
   }

}

Conclusion

Message-driven microservices are a good choice whenever you don't need an asynchronous response from your API. In this article, I have shown a sample use case of the publish/subscribe model in inter-service communication between your microservices. The source code is as usual available on GitHub (https://github.com/piomin/sample-message-driven-microservices.git). For other interesting examples of testing with Spring Cloud Stream library, also with Apache Kafka, you can refer to Chapter 11 in my book Mastering Spring Cloud (https://www.packtpub.com/application-development/mastering-spring-cloud).

The post Building and testing message-driven microservices using Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/06/15/building-and-testing-message-driven-microservices-using-spring-cloud-stream/feed/ 7 6678