spring cloud function Archives - Piotr's TechBlog https://piotrminkowski.com/tag/spring-cloud-function/ Java, Spring, Kotlin, microservices, Kubernetes, containers Fri, 19 Jan 2024 09:25:53 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 spring cloud function Archives - Piotr's TechBlog https://piotrminkowski.com/tag/spring-cloud-function/ 32 32 181738725 Serverless on Azure with Spring Cloud Function https://piotrminkowski.com/2024/01/19/serverless-on-azure-with-spring-cloud-function/ https://piotrminkowski.com/2024/01/19/serverless-on-azure-with-spring-cloud-function/#respond Fri, 19 Jan 2024 09:25:49 +0000 https://piotrminkowski.com/?p=14829 This article will teach you how to create and run serverless apps on Azure using the Spring Cloud Function and Spring Cloud Azure projects. We will integrate with the Azure Functions and Azure Event Hubs services. It is not my first article about Azure and Spring Cloud. As a preparation for that exercise, it is […]

The post Serverless on Azure with Spring Cloud Function appeared first on Piotr's TechBlog.

]]>
This article will teach you how to create and run serverless apps on Azure using the Spring Cloud Function and Spring Cloud Azure projects. We will integrate with the Azure Functions and Azure Event Hubs services.

It is not my first article about Azure and Spring Cloud. As a preparation for that exercise, it is worth reading the article to familiarize yourself with some interesting features of Spring Cloud Azure. It describes an integration with Azure Spring Apps, Cosmos DB, and App Configuration services. On the other hand, if you are interested in CI/CD for Spring Boot apps you can refer to the following article about Azure DevOps and Terraform.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. The Spring Boot apps used in the article are located in the serverless directory. After you go to that directory you should just follow my further instructions.

Architecture

In this exercise, we will prepare two sample Spring Boot apps (aka functions) account-function and customer-function. Then, we will deploy them on the Azure Function service. Our apps do not communicate with each other directly but through the Azure Event Hubs service. Event Hubs is a cloud-native data streaming service compatible with the Apache Kafka API. After adding a new customer the customer-function sends an event to the Azure Event Hubs using the Spring Cloud Stream binder. The account-function app receives the event through the Azure Event Hubs trigger. Also, the customer-function is exposed to the external client through the Azure HTTP trigger. Here’s the diagram of our architecture:

azure-serverless-spring-cloud-arch

Prerequisites

There are some prerequisites before you start the exercise. You need to install JDK17+ and Maven on your local machine. You also need to have an account on Azure and az CLI to interact with that account. Once you install the az CLI and log in to Azure you can execute the following command for verification:

$ az account show

If you would like to test Azure Functions locally, you need to install Azure Functions Core Tools. You can find detailed installation instructions in Microsoft Docs here. For macOS, there are three required commands to run:

$ brew tap azure/functions
$ brew install azure-functions-core-tools@4
$ brew link --overwrite azure-functions-core-tools@4

Create Resources on Azure

Before we proceed with the source code, we need to create several required resources on the Azure cloud. In the first step, we will prepare a resource group for all required objects. The name of the group is spring-cloud-serverless. The location depends on your preferences. For me it is eastus.

$ az group create -l eastus -n spring-cloud-serverless

In the next step, we need to create a storage account. The Azure Function service requires it, but we will also use that account during the local development with Azure Functions Core Tools.

$ az storage account create -n pminkowsserverless \
     -g spring-cloud-serverless \
     -l eastus \
     --sku Standard_LRS

In order to run serverless apps on Azure with e.g. Spring Cloud Function, we need to create the Azure Function App instances. Of course, we use the previously created resource group and storage account. The name of my Function App instances are pminkows-account-function and pminkows-customer-function. We can also set a default OS type (Linux), functions version (4), and a runtime stack (Java) for each Function App.

$ az functionapp create -n pminkows-customer-function \
     -c eastus \
     --os-type Linux \
     --functions-version 4 \
     -g spring-cloud-serverless \
     --runtime java \
     --runtime-version 17.0 \
     -s pminkowsserverless

$ az functionapp create -n pminkows-account-function \
     -c eastus \
     --os-type Linux \
     --functions-version 4 \
     -g spring-cloud-serverless \
     --runtime java \
     --runtime-version 17.0 \
     -s pminkowsserverless

Then, we have to create the Azure Event Hubs namespace. The name of my namespace is spring-cloud-serverless. The same as before I choose the East US location and the spring-cloud-serverless resource group. We can also set the pricing tier (Standard) and the upper limit of throughput units when the AutoInflate option is enabled.

$ az eventhubs namespace create -n spring-cloud-serverless \
     -g spring-cloud-serverless \
     --location eastus \
     --sku Standard \
     --maximum-throughput-units 1 \
     --enable-auto-inflate true

Finally, we have to create topics on Event Hubs. Of course, they have to be assigned to the previously created spring-cloud-serverless Event Hubs namespace. The names of our topics are accounts and customers. The number of partitions is irrelevant in this exercise.

$ az eventhubs eventhub create -n accounts \
     -g spring-cloud-serverless \
     --namespace-name spring-cloud-serverless \
     --cleanup-policy Delete \
     --partition-count 3

$ az eventhubs eventhub create -n customers \
     -g spring-cloud-serverless \
     --namespace-name spring-cloud-serverless \
     --cleanup-policy Delete \
     --partition-count 3

Now, let’s switch to the Azure Portal. Find the spring-cloud-serverless resource group. You should have the same list of resources inside this group as shown below. It means that our environment is ready and we can proceed to the source code.

App Dependencies

Firstly, we need to declare the dependencyManagement section inside the Maven pom.xml for three projects used in the app implementation: Spring Boot, Spring Cloud, and Spring Cloud Azure.

<properties>
  <java.version>17</java.version>
  <spring-boot.version>3.1.4</spring-boot.version>
  <spring-cloud-azure.version>5.7.0</spring-cloud-azure.version>
  <spring-cloud.version>2022.0.4</spring-cloud.version>
  <maven.compiler.release>${java.version}</maven.compiler.release>
  <maven.compiler.source>${java.version}</maven.compiler.source>
  <maven.compiler.target>${java.version}</maven.compiler.target>
</properties>

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-dependencies</artifactId>
      <version>${spring-cloud.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
    <dependency>
      <groupId>com.azure.spring</groupId>
      <artifactId>spring-cloud-azure-dependencies</artifactId>
      <version>${spring-cloud-azure.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-dependencies</artifactId>
      <version>${spring-boot.version}</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<build>
  <plugins>
    <plugin>
      <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-maven-plugin</artifactId>
      <executions>
       <execution>
          <goals>
           <goal>repackage</goal>
          </goals>
         </execution>
       </executions>
    </plugin>
  </plugins>
</build>

Here’s the list of required Maven dependencies. We need to include the spring-cloud-function-context library to enable Spring Cloud Functions. In order to integrate with the Azure Functions service, we need to include the spring-cloud-function-adapter-azure extension. Our apps also send messages to the Azure Event Hubs service through the dedicated Spring Cloud Stream binder.

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-function-adapter-azure</artifactId>
  </dependency>
  <dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-stream-binder-eventhubs</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-function-context</artifactId>
  </dependency>
</dependencies>

Create Azure Serverless Apps with Spring Cloud

Expose Azure Function as HTTP endpoint

Let’s begin with the customer-function. To simplify the app we will use an in-memory H2 for storing data. Each time a new customer is added, it is persisted in the in-memory database using Spring Data JPA extension. Here’s our entity class:

@Entity
public class Customer {
   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long id;
   private String name;
   private int age;
   private String status;

   // GETTERS AND SETTERS
}

Here’s the Spring Data repository interface for the Customer entity:

public interface CustomerRepository extends ListCrudRepository<Customer, Long> {
}

Let’s proceed to the Spring Cloud Functions implementation. The CustomerInternalFunctions bean defines two functions. The addConsumer function (1) persists a new customer in the database and sends the event about it to the Azure Event Hubs topic. It uses the Spring Cloud Stream StreamBridge bean for interacting with Event Hubs. It also returns the persisted Customer entity as a response. The second function changeStatus (2) doesn’t return any data, it just needs to react to the incoming event and change the status of the particular customer. That event is sent by the account-function app after generating an account for a newly created customer. The important information here is that those are just Spring Cloud functions. For now, they have nothing to do with Azure Functions.

@Service
public class CustomerInternalFunctions {

   private static final Logger LOG = LoggerFactory
      .getLogger(CustomerInternalFunctions.class);

   private StreamBridge streamBridge;
   private CustomerRepository repository;

   public CustomerInternalFunctions(StreamBridge streamBridge, 
                                    CustomerRepository repository) {
      this.streamBridge = streamBridge;
      this.repository = repository;
   }

   // (1)
   @Bean
   public Function<Customer, Customer> addCustomer() {
      return c -> {
         Customer newCustomer = repository.save(c);
         streamBridge.send("customers-out-0", newCustomer);
         LOG.info("New customer added: {}", c);
         return newCustomer;
      };
   }

   // (2)
   @Bean
   public Consumer<Account> changeStatus() {
      return account -> {
         Customer customer = repository.findById(account.getCustomerId())
                .orElseThrow();
         customer.setStatus(Customer.CUSTOMER_STATUS_ACC_ACTIVE);
         repository.save(customer);
         LOG.info("Customer activated: id={}", customer.getId());
      };
   }

}

We also need to provide several configuration properties in the Spring Boot application.properties file. We should set the Azure Event Hubs connection URL and the name of the target topic. Since our app uses Spring Cloud Stream only for sending events we should also turn off the autodiscovery of functional beans as messaging bindings.

spring.cloud.azure.eventhubs.connection-string = ${EVENT_HUBS_CONNECTION_STRING}
spring.cloud.stream.bindings.customers-out-0.destination = customers
spring.cloud.stream.function.autodetect = false

In order to expose the functions on Azure, we will take advantage of the Spring Cloud Function Azure Adapter. It includes several Azure libraries to our Maven dependencies. It can invoke the Spring Cloud Functions directly or through the lookup approach with the FunctionCatalog bean (1). The method has to be annotated with @FunctionName, which defines the name of the function in Azure (2). In order to expose that function over HTTP, we need to define the Azure @HttpTrigger (3). The trigger exposes the function as the POST endpoint and doesn’t require any authorization. Our method receives the request through the HttpRequestMessage object. Then, it invokes the Spring Cloud function addCustomer by name using the FunctionCatalog bean (4).

// (1)
@Autowired
private FunctionCatalog functionCatalog;

@FunctionName("add-customer") // (2)
public Customer addCustomerFunc(
        // (3)
        @HttpTrigger(name = "req",
                     methods = { HttpMethod.POST },
                     authLevel = AuthorizationLevel.ANONYMOUS)
        HttpRequestMessage<Optional<Customer>> request,
        ExecutionContext context) {
   Customer c = request.getBody().orElseThrow();
   context.getLogger().info("Request: {}" + c);
   // (4)
   Function<Customer, Customer> function = functionCatalog
      .lookup("addCustomer");
   return function.apply(c);
}

Integrate Function with Azure Data Hubs Trigger

Let’s switch to the account-function app directory inside our Git repository. The same as customer-function it uses an in-memory H2 database for storing customer accounts. Here’s our Account entity class:

@Entity
public class Account {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String number;
    private Long customerId;
    private int balance;

   // GETTERS AND SETTERS ...
}

Inside the account-function app, there is a single Spring Cloud Function addAccount. It generates a new 16-digit account number for each new customer. Then, it saves the account in the database and sends the event to the Azure Event Hubs with Spring Cloud Stream StreamBridge bean.

@Service
public class AccountInternalFunctions {

   private static final Logger LOG = LoggerFactory
      .getLogger(AccountInternalFunctions.class);

   private StreamBridge streamBridge;
   private AccountRepository repository;

   public AccountInternalFunctions(StreamBridge streamBridge, 
                                   AccountRepository repository) {
      this.streamBridge = streamBridge;
      this.repository = repository;
   }

   @Bean
   public Function<Customer, Account> addAccount() {
      return customer -> {
         String n = RandomStringUtils.random(16, false, true);
         Account a = new Account(n, customer.getId(), 0);
         a = repository.save(a);
         boolean b = streamBridge.send("accounts-out-0", a);
         LOG.info("New account added: {}", a);
         return a;
      };
   }

}

The same as for customer-function we need to provide some configuration settings for Spring Cloud Stream. However, instead of the customers topic, this time we are sending events to the accounts topic.

spring.cloud.azure.eventhubs.connection-string = ${EVENT_HUBS_CONNECTION_STRING}
spring.cloud.stream.bindings.accounts-out-0.destination = accounts
spring.cloud.stream.function.autodetect = false

The account-function app is not exposed as the HTTP endpoint. We want to trigger the function in reaction to the event delivered to the Azure Event Hubs topic. The name of our Azure function is new-customer (1). It receives the Customer event from the customers topic thanks to the @EventHubTrigger annotation (2). This annotation also defines a property name, that contains the address of the Azure Event Hubs namespace (EVENT_HUBS_CONNECTION_STRING). I’ll show you later how to set such a property for our function in Azure. Once, the new-customer function is triggered, it invokes the Spring Cloud Function addAccount (3).

@Autowired
private FunctionCatalog functionCatalog;

// (1)
@FunctionName("new-customer")
public void newAccountEventFunc(
        // (2)
        @EventHubTrigger(eventHubName = "customers",
                         name = "newAccountTrigger",
                         connection = "EVENT_HUBS_CONNECTION_STRING",
                         cardinality = Cardinality.ONE)
        Customer event,
        ExecutionContext context) {
   context.getLogger().info("Event: " + event);
   // (3)
   Function<Customer, Account> function = functionCatalog
      .lookup("addAccount");
   function.apply(event);
}

At the end of this section, let’s switch back once again to the customer-function app. It fires on the event sent by the new-customer function to the accounts topic. Therefore we are using the @EventHubTrigger annotation once again.

@FunctionName("activate-customer")
public void activateCustomerEventFunc(
          @EventHubTrigger(eventHubName = "accounts",
                 name = "changeStatusTrigger",
                 connection = "EVENT_HUBS_CONNECTION_STRING",
                 cardinality = Cardinality.ONE)
          Account event,
          ExecutionContext context) {
   context.getLogger().info("Event: " + event);
   Consumer<Account> consumer = functionCatalog.lookup("changeStatus");
   consumer.accept(event);
}

All our functions are ready. Now, we can proceed to the deployment phase.

Running Azure Functions Locally with Maven

Before we deploy our functions on Azure, we can run and test them locally. I assume you have already the Azure Functions Core Tools according to the “Prerequisites” section. Our function still needs to connect with some services on the cloud like Azure Event Hubs or the storage account. The address to the Azure Event Hubs should be set as the EVENT_HUBS_CONNECTION_STRING app property or environment variable. In order to find the Event Hubs connection string, we should switch to the Azure Portal and find the spring-cloud-serverless namespace. Then, we need to go to the “Shared access policies” menu item and click the “RootManagedSharedAccessKey” policy. The connection string value is available in the “Connection string-primary key” field.

We also need to obtain the connection credentials to the Azure storage account. In your storage account, you should find the “Access keys” section and copy the value from the “Connection string” field.

After that, we can create the local.settings.json file in each app’s root directory. Alternatively, we can just set the environment variables AzureWebJobsStorage and EVENT_HUBS_CONNECTION_STRING.

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": <YOUR_ACCOUNT_STORAGE_CONNECTION_STRING>,
    "FUNCTIONS_WORKER_RUNTIME": "java",
    "EVENT_HUBS_CONNECTION_STRING": <YOUR_EVENT_HUBS_CONNECTION_STRING>
  }
}

Once you place your credentials in the local.settings.json file, you can build the app with Maven.

$ mvn clean package

After that, you can use the Maven plugin included in the spring-cloud-function-adapter-azure module. In order to run the function, you need to execute the following command:

$ mvn azure-functions:run 

Here’s the command output for the customer-function app. As you see, it contains two Azure functions: add-customer (HTTP) and activate-customer (Event Hub Trigger). You can test the function by invoking the http://localhost:7071/api/add-customer URL.

Here’s the command output for the account-function app. As you see, it contains a single function new-customer activated through the Event Hub trigger.

Deploy Spring Cloud Serverless on Azure Functions

Let’s take a look at the pminkows-customer-function Azure Function App before we deploy our first app there. The http://pminkows-customer-function.azurewebsites.net base URL will precede all the HTTP endpoint URLs for our functions. We should remember the name of the automatically generated service plan (EastUSLinuxDynamicPlan).

Before deploying our Spring Boot app we should create the host.json file with the following content:

{
  "version": "2.0",
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  }
}

Then, we should add the azure-functions-maven-plugin Maven plugin to our pom.xml. In the configuration section, we have to set the name of the Azure Function App instance (pminkows-customer-function), the target resource group (spring-cloud-serverless), the region (eastus), the service plan (EastUSLinuxDynamicPlan), and the location of the host.json file. We also need to set the connection string to the Azure Event Hubs inside the EVENT_HUBS_CONNECTION_STRING app property. So before running the build, you should export the value of that environment variable.

<plugin>
  <groupId>com.microsoft.azure</groupId>
  <artifactId>azure-functions-maven-plugin</artifactId>
  <version>1.30.0</version>
  <configuration>
    <appName>pminkows-customer-function</appName>
    <resourceGroup>spring-cloud-serverless</resourceGroup>
    <region>eastus</region>
    <appServicePlanName>EastUSLinuxDynamicPlan</appServicePlanName>
    <hostJson>${project.basedir}/src/main/resources/host.json</hostJson>
    <runtime>
      <os>linux</os>
      <javaVersion>17</javaVersion>
    </runtime>
    <appSettings>
      <property>
        <name>FUNCTIONS_EXTENSION_VERSION</name>
        <value>~4</value>
      </property>
      <property>
        <name>EVENT_HUBS_CONNECTION_STRING</name>
        <value>${EVENT_HUBS_CONNECTION_STRING}</value>
      </property>
    </appSettings>
  </configuration>
  <executions>
    <execution>
      <id>package-functions</id>
      <goals>
        <goal>package</goal>
      </goals>
    </execution>
  </executions>
</plugin>

Let’s begin with the customer-function app. Before deploying the app we need to build it first with the mvn clean package command. Once you do it you can run your first function Azure with the following command:

$ mvn azure-functions:deploy

Here’s my output after running that command. As you see, the function add-customer is exposed under the http://pminkows-customer-function.azurewebsites.net/api/add-customer URL.

azure-serverless-spring-cloud-deploy-mvn

We can come back to the pminkows-customer-function Azure Function App in the portal. According to the expectations, two functions are running there. The activate-customer function is triggered by the Azure Event Hub.

azure-serverless-spring-cloud-functions

Let’s switch to the account-function directory. We will deploy it to the Azure pminkows-account-function function. Once again we need to build the app with mvn clean package command, and then deploy it using the mvn azure-functions:deploy command. Here’s the output. There are no HTTP triggers defined, but just a single function triggered by the Azure Event Hub.

Here are the details of the pminkows-account-function Function App in the Azure Portal.

Invoke Azure Functions

Finally, we can test our functions by calling the following endpoint using e.g. curl. Let’s repeat the similar command several times with different data:

$ curl https://pminkows-customer-function.azurewebsites.net/api/add-customer \ 
    -d "{\"name\":\"Test\",\"age\":33}" \
    -H "Content-Type: application/json"

After that, we should switch to the Azure Portal. Go to the pminkows-customer-function details and click the link “Invocation and more” on the add-customer function.

You will be redirected to the Azure Monitor statistics for that function. Azure Monitor displays a list of invocations with statuses.

azure-serverless-spring-cloud-functions-invocations

We can click one of the records from the invocation history to see the details.

As you probably remember, the add-customer function sends messages to Azure Event Hubs. On the other hand, we can also verify how the new-customer function in the pminkows-account-function consumes and handles those events.

azure-serverless-spring-cloud-functions-logs

Final Thoughts

This article gives you a comprehensive guide on how to build and run Spring Cloud serverless apps on Azure Functions. It explains the concept of the triggers in Azure Functions and shows the integration with Azure Event Hubs. Finally, it shows how to run such functions locally and then monitor them on Azure after deployment.

The post Serverless on Azure with Spring Cloud Function appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2024/01/19/serverless-on-azure-with-spring-cloud-function/feed/ 0 14829
Serverless Java Functions on OpenShift https://piotrminkowski.com/2021/11/30/serverless-java-functions-on-openshift/ https://piotrminkowski.com/2021/11/30/serverless-java-functions-on-openshift/#respond Tue, 30 Nov 2021 13:52:46 +0000 https://piotrminkowski.com/?p=10262 In this article, you will learn how to create and deploy serverless, Knative based functions on OpenShift. We will use a single kn CLI command to build and run our applications on the cluster. How we can do that? With OpenShift Serverless Functions we may use the kn func plugin that allows us to work […]

The post Serverless Java Functions on OpenShift appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to create and deploy serverless, Knative based functions on OpenShift. We will use a single kn CLI command to build and run our applications on the cluster. How we can do that? With OpenShift Serverless Functions we may use the kn func plugin that allows us to work directly with the source code. It uses Cloud Native Buildpacks API to create container images. It supports several runtimes like Node.js, Python, or Golang. However, we will try Java runtimes based on the Quarkus or Spring Boot frameworks.

Prerequisites

You need to have two things to be able to run this exercise by yourself. Firstly, you need to run Docker or Podman on your local machine, because CNCF Buildpacks use it for running build. If you are not familiar with Cloud Native Buildpacks you can read my article about it. I tried to configure Podman according to this part of the documentation, but I was not succeded (on macOS). With Docker, it just works, so I avoided other tries with Podman.

On the other hand, you need to have a target OpenShift cluster with the serverless module installed. You can run it locally using Code Ready Containers (crc). But in my opinion, a better idea is to try the developer sandbox available online here. It contains all you need to start development – including OpenShift Serverless. It is by default available on the sandbox version of Openshift.

Finally, you need to install the oc client and kn CLI locally. Since we use the kn func plugin, we need to install the Knative CLI version provided by RedHat. The detailed installation instruction is available here.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then go to the serverless/functions directory. After that, you should just follow my instructions. Let’s begin.

Create OpenShift Serverless function with Quarkus

We can generate a sample application source code using a single kn command. We may choose between multiple runtimes and two templates. Currently, there are five runtimes available. By default it is node (for Node.js applications), but you can also set quarkus, springboot, typescript, go or python. Also, there are two templates available: http for simple REST-based applications and events for applications leveraging the Knative Eventing approach in communication. Let’s create our first application using the quarkus runtime and events template.

$ kn func create -l quarkus -t events caller-function

Now, go to the order-function directory and edit generated pom.xml file. Firstly, we will replace a version of Java into 11, and a version of Quarkus with the latest.

<properties>
  <compiler-plugin.version>3.8.1</compiler-plugin.version>
  <maven.compiler.parameters>true</maven.compiler.parameters>
  <maven.compiler.source>11</maven.compiler.source>
  <maven.compiler.target>11</maven.compiler.target>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  <quarkus-plugin.version>2.4.2.Final</quarkus-plugin.version>
  <quarkus.platform.artifact-id>quarkus-universe-bom</quarkus.platform.artifact-id>
  <quarkus.platform.group-id>io.quarkus</quarkus.platform.group-id>
  <quarkus.platform.version>2.4.2.Final</quarkus.platform.version>
  <surefire-plugin.version>3.0.0-M5</surefire-plugin.version>
</properties>

By default the kn func plugin includes some dependencies in the test scope and a single dependency with the Quarkus Funqy Knative extension. There is also Quarkus Smallrye Health to automatically generate liveness and readiness health checks.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-funqy-knative-events</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-smallrye-health</artifactId>
</dependency>

By default, the kn func generates a simple function that takes CloudEvent as an input and sends the same event as an output. I will not change there much and just replace System.out with the Logger implementation in order to print logs.

public class Function {

   @Inject
   Logger logger;

   @Funq
   public CloudEvent<Output> function(CloudEvent<Input> input) {
      logger.infof("New event: %s", input);
      Output output = new Output(input.data().getMessage());
      return CloudEventBuilder.create().build(output);
   }

}

Assuming you have already logged in to your OpenShift cluster using the oc client you can proceed to the function deployment. In fact, you just need to go to your application directory and then run a single kn func command as shown below.

$ kn func deploy -i quay.io/pminkows/caller-function -v

Once you run the command visible above the local is starting on Docker. If it finishes successfully, we are proceeding to the deployment phase.

openshift-serverless-functions-build-and-deploy

In the application root directory, there is also an automatically generated configuration file func.yaml.

name: caller-function
namespace: ""
runtime: quarkus
image: quay.io/pminkows/caller-function
imageDigest: sha256:5d3ef16e1282bc5f6367dff96ab7bb15487199ac3939e262f116657a83706245
builder: quay.io/boson/faas-jvm-builder:v0.8.4
builders: {}
buildpacks: []
healthEndpoints: {}
volumes: []
envs: []
annotations: {}
options: {}
labels: []

Create OpenShift Serveless functions with Spring Boot

Now, we will do exactly the same thing as before, but this time for the Spring Boot application. In order to create a Spring Boot function we just need to set springboot as the runtime name. The name of our application is callme-function.

$ kn func create -l springboot -t events callme-function

Then we go to the callme-function directory. Firstly, let’s edit Maven pom.xml. The same as for Quarkus I’ll set the latest version of Spring Boot.

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.6.1</version>
  <relativePath />
</parent>

The generated application is built on top of the Spring Cloud Function project. We don’t need to add there anything.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-function-web</artifactId>
</dependency>

Spring Boot function code is a little bit more complicated than generated by the Quarkus framework. It uses Spring functional programming style, where a Function bean represents HTTP POST endpoint with input and output response.

@SpringBootApplication
public class SpringCloudEventsApplication {

  private static final Logger LOGGER = Logger.getLogger(
      SpringCloudEventsApplication.class.getName());

  public static void main(String[] args) {
    SpringApplication.run(SpringCloudEventsApplication.class, args);
  }

  @Bean
  public Function<Message<Input>, Output> uppercase(CloudEventHeaderEnricher enricher) {
    return m -> {
      HttpHeaders httpHeaders = HeaderUtils.fromMessage(m.getHeaders());
      
      LOGGER.log(Level.INFO, "Input CE Id:{0}", httpHeaders.getFirst(
          ID));
      LOGGER.log(Level.INFO, "Input CE Spec Version:{0}",
          httpHeaders.getFirst(SPECVERSION));
      LOGGER.log(Level.INFO, "Input CE Source:{0}",
          httpHeaders.getFirst(SOURCE));
      LOGGER.log(Level.INFO, "Input CE Subject:{0}",
          httpHeaders.getFirst(SUBJECT));

      Input input = m.getPayload();
      LOGGER.log(Level.INFO, "Input {0} ", input);
      Output output = new Output();
      output.input = input.input;
      output.operation = httpHeaders.getFirst(SUBJECT);
      output.output = input.input != null ? input.input.toUpperCase() : "NO DATA";
      return output;
    };
  }

  @Bean
  public CloudEventHeaderEnricher attributesProvider() {
    return attributes -> attributes
        .setSpecVersion("1.0")
        .setId(UUID.randomUUID()
            .toString())
        .setSource("http://example.com/uppercase")
        .setType("com.redhat.faas.springboot.events");
  }

  @Bean
  public Function<String, String> health() {
    return probe -> {
      if ("readiness".equals(probe)) {
        return "ready";
      } else if ("liveness".equals(probe)) {
        return "live";
      } else {
        return "OK";
      }
    };
  }
}

Because there are two functions (@Bean Function) defined in the generated code, you need to add the following property in the application.properties file.

spring.cloud.function.definition = uppercase;health

Deploying serverless functions on OpenShift

We have two sample applications deployed on the OpenShift cluster. The first of them is written in Quarkus, while the second of them in Spring Boot. Those applications will communicate with each other through events. So in the first step, we need to create a Knative Eventing broker.

$ kn broker create default

Let’s check if the broker has been successfully created.

$ kn broker list
NAME      URL                                                                                  AGE   CONDITIONS   READY   REASON
default   http://broker-ingress.knative-eventing.svc.cluster.local/piomin-serverless/default   12m   5 OK / 5     True

Then, let’s display a list of running Knative services:

$ kn service list
NAME              URL                                                                                       LATEST                  AGE   CONDITIONS   READY   REASON
caller-function   https://caller-function-piomin-serverless.apps.cluster-8e1d.8e1d.sandbox114.opentlc.com   caller-function-00002   18m   3 OK / 3     True    
callme-function   https://callme-function-piomin-serverless.apps.cluster-8e1d.8e1d.sandbox114.opentlc.com   callme-function-00006   11h   3 OK / 3     True 

Send and receive CloudEvent with Quarkus

The architecture of our solution is visible in the picture below. The caller-function receives events sent directly by us using the kn func plugin. Then it processes the input event, creates a new CloudEvent and sends it to the Knative Broker. The broker just receives events. To provide more advanced event routing we need to define the Knative Trigger. The trigger is able to filter events and then send them directly to the target sink. On the other hand, those events are received by the callme-function.

openshift-serverless-functions-arch

Ok, so now we need to rewrite caller-function to add a step of creating and sending CloudEvent to the Knative Broker. To do we need to declare and invoke the Quarkus REST client. Firstly, let’s add the required dependencies.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-rest-client-jackson</artifactId>
</dependency>

In the next step, we will create a client interface with a declaration of a calling method. The CloudEvent specification requires four HTTP headers to be set on the POST request. The context path of a Knative Broker is /piomin-serverless/default, and we have already checked it out using the kn broker list command.

@Path("/piomin-serverless/default")
@RegisterRestClient
public interface BrokerClient {

   @POST
   @Produces(MediaType.APPLICATION_JSON)
   String sendEvent(Output event,
                    @HeaderParam("Ce-Id") String id,
                    @HeaderParam("Ce-Source") String source,
                    @HeaderParam("Ce-Type") String type,
                    @HeaderParam("Ce-Specversion") String version);
}

We also need to set the broker address in the application.properties. We use a standard property mp-rest/url handled by the MicroProfile REST client.

functions.BrokerClient/mp-rest/url = http://broker-ingress.knative-eventing.svc.cluster.local 

Here’s the final implementation of our function in the caller-function module. The type of event is caller.output. In fact, we can set any name as an event type.

public class Function {

   @Inject
   Logger logger;
   @Inject
   @RestClient
   BrokerClient client;

   @Funq
   public CloudEvent<Output> function(CloudEvent<Input> input) {
      logger.infof("New event: %s", input);
      Output output = new Output(input.data().getMessage());
      CloudEvent<Output> outputCloudEvent = CloudEventBuilder.create().build(output);
      client.sendEvent(output,
                input.id(),
                "http://caller-function",
                "caller.output",
                input.specVersion());
      return outputCloudEvent;
    }

}

Finally, we will create a Knative Trigger. It gets events incoming to the broker and filters out by type. Only events with the type caller.output are forwarded to the callme-function.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: callme-trigger
spec:
  broker: default
  filter:
    attributes:
      type: caller.output
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: callme-function
    uri: /uppercase

Now, we can send a test CloudEvent to the caller-function directly from the local machine with the following command (ensure you are calling it in the caller-function directory):

$ kn func emit -d "Hello"

The Output class in the caller-function and Input class in the callme-function should have the same fields. By default kn func generates different fields for Quarkus and Spring Boot example applications. So you also need to refactor one of these objects. I changed the field in the callme-function Input class.

Summary

Let’s summarize our exercise. We built and deployed two applications (Quarkus and Spring Boot) directly from the source code into the target cluster. Thanks to the OpenShift Serverless Functions we didn’t have to provide any additional configuration to do that to deploy them as Knative services.

If there is no incoming traffic, Knative services are automatically scaled down to zero after 60 seconds (by default).

So, to generate some traffic we may use the kn func emit command. It sends a CloudEvent message directly to the target application. In our case, it is caller-function (Quarkus). After receiving an input event the pod with the caller-function starts. After startup, it sends a CloudEvent message to the Knative Broker. Finally, the event goes to the callme-service (Spring Boot), which is also starting as shown below.

openshift-serverless-functions-pods

As you see OpenShift provides several simplifications when working with Knative. And what’s important, now you can easily test them by yourself using the developer sandbox version of OpenShift available online.

The post Serverless Java Functions on OpenShift appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/11/30/serverless-java-functions-on-openshift/feed/ 0 10262
Knative Eventing with Kafka and Spring Cloud https://piotrminkowski.com/2021/03/12/knative-eventing-with-kafka-and-spring-cloud/ https://piotrminkowski.com/2021/03/12/knative-eventing-with-kafka-and-spring-cloud/#comments Fri, 12 Mar 2021 11:55:12 +0000 https://piotrminkowski.com/?p=9569 In this article, you will learn how to run eventing applications on Knative using Kafka and Spring Cloud. I’ll show you what is Knative Eventing, and how to integrate it with the Kafka broker. We will build our applications on top of Spring Cloud Function and Spring Cloud Stream. All these solutions seem to be […]

The post Knative Eventing with Kafka and Spring Cloud appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to run eventing applications on Knative using Kafka and Spring Cloud. I’ll show you what is Knative Eventing, and how to integrate it with the Kafka broker. We will build our applications on top of Spring Cloud Function and Spring Cloud Stream. All these solutions seem to be a perfect match. Why? Let me invite you to read the article.

However, before we proceed you need to have a piece of knowledge about Knative basic concepts. Therefore, I suggest you read more about it. You can start with those two articles: Spring Boot on Knative and Microservices on Knative with GraalVM and Spring Boot. Of course, you can as well refer to the Knative documentation.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

Today we will base on the simple architecture that complies with an eventual consistency pattern. It is also known as a SAGA pattern. What exactly is that? The sample system consists of three services. The order-service creates a new order that is related to the customers and products. That order is sent to the Kafka topic. Then, our two other applications customer-service and product-service receive the order event. After that, they perform a reservation. The customer-service reserves an order’s amount on the customer’s account. Meanwhile the product-service reserves a number of products specified in the order. Both these services send a response to the order-service through the Kafka topic. If the order-service receives positive reservations from both services it confirms the order. Then, it sends an event with that information. Both customer-service and product-service receive the event and confirm reservations. You can verify it in the picture below.

knative-eventing-kafka-arch

Prerequisites

Before we start, we first need to install Knative on the Kubernetes cluster. I’m using a local instance of Kubernetes. But you may as well use any remote like GKE. However, the latest version of Knative requires a Kubernetes cluster v1.17 or later. Of course, we need to install both Serving and Eventing components. You may find the detailed installation instructions here.

That’s not all. We also need to install Kafka Eventing Broker. Here’s the link to the releases site. It includes several deployments and CRDs. You should pay special attention to the KafkaSource and KafkaBinding CRDs, since we will use them later.

Finally, we need to install Kafka cluster on Kubernetes. The recommended way to do that is with the Strimzi operator. Strimzi provides container images and operators for running Kafka on Kubernetes. It also comes with a set of CRDs for managing the Kafka cluster. Once you install it you may proceed to the next steps. I installed it in the kafka namespace. Here’s the list of running pods.

Step 1: Create and configure Knative Kafka Broker

In the first step, we are going to create a Kafka cluster using Strimzi CRD. To simplify, we won’t use any more advanced configuration settings. For example, I used ephemeral storage, which is not recommended in production. I set three instances of Zookeeper. I heard that Kafka is finally planning to resign from Zookeeper, but the current version still bases on it.

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 1
    listeners:
      plain: {}
      tls: {}
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.4"
    storage:
      type: ephemeral
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
  entityOperator:
    topicOperator: {}
    userOperator: {}

The Knative broker allows to route events to different event sinks or consumers. We may use different broker providers. When an event is sent to the broker, all request metadata other than the CloudEvent data and context attributes are stripped away. The event delivery mechanism hides details of event routing from the event producer and consumer. The default broker class is MTChannelBasedBroker. We will change it into Kafka.

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: default
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing

In this article, we won’t directly use Kafka broker. Instead, we will use the KafkaSource object that takes events from a particular topic and sends them to the subscriber. If you want to use Broker you need to define Knative Trigger that refers to it.

The broker refers to the ConfigMap kafka-broker-config. The most important thing there is to set the address of the Kafka cluster. If you didn’t change anything in the default Kafka installation files it is ${KAFKA_CLUSTER_NAME}-kafka-bootstrap and port 9092.

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  default.topic.partitions: "10"
  default.topic.replication.factor: "1"
  bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"

Step 2: Create an application with Spring Cloud Stream

Let’s start with dependencies. Each of our applications uses an in-memory H2 database. They integrate with the database using the Spring Data JPA repository pattern. However, the most important thing is that they all base on Spring Cloud Stream to interact with Kafka topics. Spring Cloud Stream requires adding a concrete binder implementation to the classpath. That’s why we add the spring-cloud-starter-stream-kafka starter. For some time the Spring Cloud Stream programming model is built on top of Spring Cloud Function. Fortunately, we may easily export functions as an HTTP endpoint. This feature will be useful for us later. Currently, let’s just take a look at a list of included dependencies.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-function-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.16</version>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
</dependency>

Here’s the model class for the order-service. Once the order is created and saved in the database, the order-service sends it to the output Kafka topic.

@Entity
@Table(name = "orders")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Order {
    @Id
    private Integer id;
    private Integer customerId;
    private Integer productId;
    private int amount;
    private int productCount;
    @Enumerated
    private OrderStatus status = OrderStatus.NEW;
}

We have three functions inside the order-service application main class. Two of them send events to the output destination continuously. On the other hand, the third of them confirm() wait for incoming events. We will discuss it later. The orderEventSupplier function represents the first step in our scenario. It creates a new order with test data, saves it in the database before sending.

@SpringBootApplication
@Slf4j
public class OrderSagaApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderSagaApplication.class, args);
    }
    private static int num = 0;
    private BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
    @Bean
    public Supplier<Order> orderEventSupplier() {
        return () -> repository.save(new Order(++num, num%10+1, num%10+1, 100, 1, OrderStatus.NEW));
    }
    @Bean
    public Supplier<Order> orderConfirmSupplier() {
        return () -> queue.poll();
    }
    @Bean
    public Consumer<Message<Order>> confirm() {
        return this::doConfirm;
    }
    @Autowired
    OrderRepository repository;
    private void doConfirm(Message<Order> msg) {
        Order o = msg.getPayload();
        log.info("Order received: {}", o);
        Order order = repository.findById(o.getId()).orElseThrow();
        if (order.getStatus() == OrderStatus.NEW) {
            order.setStatus(OrderStatus.IN_PROGRESS);
        } else if (order.getStatus() == OrderStatus.IN_PROGRESS) {
            order.setStatus(OrderStatus.CONFIRMED);
            log.info("Order confirmed : {}", order);
            queue.offer(order);
        }
        repository.save(order);
    }
}

The name of the output Kafka topic is order-events. We set it for both Supplier functions using the Spring Cloud Stream bindings pattern. On the other hand, the Consumer function will not receive events directly from the Kafka topic. Why? Because it is a part of Knative Eventing process and I will explain it later in the step. For now, it is important to specify that only suppliers bind to the external destination using the spring.cloud.function.definition property.

spring.application.name: order-saga
spring.cloud.stream.bindings.orderEventSupplier-out-0.destination: order-events
spring.cloud.stream.bindings.orderConfirmSupplier-out-0.destination: order-events
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
spring.cloud.function.definition: orderEventSupplier;orderConfirmSupplier

Finally, we need to create the KafkaBinding that will inject Kafka bootstrap information into the application container (through the Knative Service). Then, the application can access it as the KAFKA_BOOTSTRAP_SERVERS environment variable.

apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
  name: kafka-binding-order-saga
spec:
  subject:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: order-saga
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092

Step 3: Create Kafka sources and Spring Cloud Function endpoints

Ok, we have already created a function responsible for generating and sending orders to the Kafka topic inside the order-service. So, now our goal is to receive and handle it on the customer-service and product-service sides. Our applications won’t directly listen for incoming events on the Kafka topic. To clarify, the basic Knative Eventing assumption is that the application don’t care how the events are published. It will just receive the events as an HTTP POST. And here comes KafkaSource object. It takes a list of input topics and a destination sink as parameters. In our case, it gets messages from order-events and send it as HTTP POST to the endpoint /customers/reserve of the customer-saga Knative Service.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - order-events
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /customers/reserve

Here’s an implementation of the customer-saga application. Thanks to Spring Cloud Function Web it automatically exports the reserve function as the HTTP endpoint with the path /reserve. Once, the consumer receives the event it performs the rest of business logic. If the input order has a NEW status the customer-saga creates reservation for a particular amount on the customer account. Then it sends event response to the order-saga. In other words, it first puts event into BlockingQueue. We also use a Supplier function for sending events to the Kafka topic. This time supplier function takes Order objects from BlockingQueue. Finally, if our application receives confirmation order from order-saga it commits the whole transaction by removing reserved amount.

@SpringBootApplication
@Slf4j
public class CustomerSagaApplication {
    public static void main(String[] args) {
        SpringApplication.run(CustomerSagaApplication.class, args);
    }
    private BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
    @Autowired
    private CustomerRepository repository;
    
    @Bean
    public Supplier<Order> orderEventSupplier() {
        return () -> queue.poll();
    }
    @Bean
    public Consumer<Message<Order>> reserve() {
        return this::doReserve;
    }
    private void doReserve(Message<Order> msg) {
        Order order = msg.getPayload();
        log.info("Body: {}", order);
        Customer customer = repository.findById(order.getCustomerId()).orElseThrow();
        log.info("Customer: {}", customer);
        if (order.getStatus() == OrderStatus.NEW) {
            customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
            customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
            order.setStatus(OrderStatus.IN_PROGRESS);
            queue.offer(order);
        } else if (order.getStatus() == OrderStatus.CONFIRMED) {
            customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
        }
        repository.save(customer);
    }
}

We can also set the base context path for HTTP endpoints using the spring.cloud.function.web.path property. So, the final path of our target endpoint is /customers/reserver. It is the same as the address defined in the KafkaSource definition.

spring.cloud.function.web.path: /customers

Here’s a configuration for the customer-saga inside the application.yml file.

spring.application.name: customer-saga
spring.cloud.function.web.path: /customers
spring.cloud.stream.bindings.orderEventSupplier-out-0.destination: reserve-events
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
spring.cloud.function.definition: orderEventSupplier

The implementation of the business logic inside product-saga is pretty similar to the customer-saga. There is a single Consumer function that receives orders, and a single Supplier responsible for sending a response to the order-saga.

@SpringBootApplication
@Slf4j
public class ProductSagaApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProductSagaApplication.class, args);
    }
    @Autowired
    private ProductRepository repository;
    private BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
    @Bean
    public Supplier<Order> orderEventSupplier() {
        return () -> queue.poll();
    }
    @Bean
    public Consumer<Message<Order>> reserve() {
        return this::doReserve;
    }
    private void doReserve(Message<Order> msg) {
        Order order = msg.getPayload();
        log.info("Body: {}", order);
        Product product = repository.findById(order.getProductId()).orElseThrow();
        log.info("Product: {}", product);
        if (order.getStatus() == OrderStatus.NEW) {
            product.setReservedItems(product.getReservedItems() + order.getProductsCount());
            product.setAvailableItems(product.getAvailableItems() - order.getProductsCount());
            order.setStatus(OrderStatus.IN_PROGRESS);
            queue.offer(order);
        } else if (order.getStatus() == OrderStatus.CONFIRMED) {
            product.setReservedItems(product.getReservedItems() - order.getProductsCount());
        }
        repository.save(product);
    }
}

Step 4: Run applications on Knative Eventing and Kafka

Here’s a typical definition of the Knative Service for our applications. I’m using the dev.local option, but if you run a remote cluster you may replace it with your Docker username or any other repository account you have.

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: order-saga
spec:
  template:
    spec:
      containers:
        - image: dev.local/order-saga

I use Skaffold together with Jib Maven Plugin for building and deploying applications on Knative. My target namespace is serverless. With the tail option you may observe logs after deployment. Of course, you may as well use the skaffold dev command.

$ skaffold run --tail -n serverless

After running all our applications on Knative eventing with Kafka we may verify a list of services using kn CLI.

knative-eventing-kafka-services

Then, we may verify that all KafkaBindings have been created. To do let’s just execute the following kubectl command.

The next important component is KafkaSource. We have already created three sources, a single one per application.

knative-eventing-kafka-sources

After starting, the order-saga application continuously generates and sends a new order each second. Both product-saga and customer-saga receive events and send responses. Thanks to that, the traffic is exchanged without any interruption. Except for the application pods we have three pods with Kafka sources.

Let’s just take a look at the application logs. Here are the logs from the order-saga. As you see it receives the order reservations from both customer-saga and product-saga. After that, it confirms the order and sends a response back to the order-events topic on Kafka. Basically, that’s what we wanted to achieve.

Final Thoughts

I hope you enjoyed this article. Knative is still a relatively new solution. I think we may expect some new and interesting features in the near future. With Knative Eventing you may use some other event sources than Kafka. Personally, I’m waiting for integration with RabbitMQ, which is under development now. For a full list of available solutions, you may refer to that site.

It is my third article about Knative and Spring Boot. You may expect more articles about Knative soon! Next time, I’m going to show you an example with another popular Java framework – Quarkus.

The post Knative Eventing with Kafka and Spring Cloud appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/03/12/knative-eventing-with-kafka-and-spring-cloud/feed/ 4 9569