knative kafka Archives - Piotr's TechBlog https://piotrminkowski.com/tag/knative-kafka/ Java, Spring, Kotlin, microservices, Kubernetes, containers Tue, 18 Apr 2023 09:02:53 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 knative kafka Archives - Piotr's TechBlog https://piotrminkowski.com/tag/knative-kafka/ 32 32 181738725 Serverless on OpenShift with Knative, Quarkus and Kafka https://piotrminkowski.com/2023/04/18/serverless-on-openshift-with-knative-quarkus-and-kafka/ https://piotrminkowski.com/2023/04/18/serverless-on-openshift-with-knative-quarkus-and-kafka/#comments Tue, 18 Apr 2023 09:02:49 +0000 https://piotrminkowski.com/?p=14100 In this article, you will learn how to build and run Quarkus serverless apps on OpenShift and integrate them through Knative Eventing. We will use Kafka to exchange messages between the apps. However, Knative supports various event sources. Kafka is just one of the available options. You can check out a full list of supported […]

The post Serverless on OpenShift with Knative, Quarkus and Kafka appeared first on Piotr's TechBlog.

]]>

In this article, you will learn how to build and run Quarkus serverless apps on OpenShift and integrate them through Knative Eventing. We will use Kafka to exchange messages between the apps. However, Knative supports various event sources. Kafka is just one of the available options. You can check out a full list of supported solutions in the Knative Eventing docs.

I have already published several articles about Knative on my blog. If you want a brief start read my article about Knative basics and Spring Boot. There is also a similar article to the current one more focused on Kubernetes. Today, we will focus more on the OpenShift support for the serverless features. Also, Knative is changing dynamically, so there are some significant differences in comparison to the version described in my previous articles.

Although I’m running my example apps on OpenShift, I’ll give you a recipe for how to do the same thing on vanilla Kubernetes. In order to run them on Kubernetes, you need to activate the kubernetes Maven profile instead of openshift during the build.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

How it works

During the exercise, we will deploy three Quarkus apps on OpenShift: order-service, stock-service and payment-service. All these apps are exposing a single HTTP POST endpoint for incoming events. They are also using Quarkus REST client for sending events to Kafka through the Knative Eventing. The order-service app is sending a single event that both should receive stock-service and payment-service. Then they are processing the event and send a response back to the order-service. All those things happen asynchronously by leveraging Kafka and Knative Broker. However, that process is completely transparent for the apps, which just expose the HTTP endpoint and use the HTTP client to call the endpoint exposed by the KafkaSink object.

The diagram is visible below illustrates the architecture of our solution. There are several Knative objects: KafkaSink, KafkaSource, Trigger, and Broker. The KafkaSink object eliminates the need to use Kafka client on the app side. It receives HTTP requests in CloudEvent format and converts them to the Kafka message sent to the particular topic. The KafkaSource object receives messages from Kafka and sends them to the Knative Broker. Finally, we need to define Trigger. The Trigger object filters the events inside Broker and sends them to the target app by calling its HTTP endpoint.

openshift-serverless-arch

Prerequisites

Before we proceed to the Quarkus apps, we need to install and configure two operators on OpenShift: AMQ Streams (Kafka Strimzi) and OpenShift Serverless (Knative).

openshift-serverless-operators

As a configuration phase, I define the creation of four components: Kafka, KnativeServing, KnativeEventing and KnativeKafka. We can easily create them using OpenShift Console. In all cases, we can leave the default settings. I create the Kafka instance in the kafka namespace. Just in case, here’s the YAML manifest for creating a 3-node Kafka cluster:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
spec:
  kafka:
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: '3.3'
    storage:
      type: ephemeral
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    version: 3.3.1
    replicas: 3
  zookeeper:
    storage:
      type: ephemeral
    replicas: 3

The KnativeServing should be created in the knative-serving namespace, while KnativeEventing in the knative-eventing namespace.

kind: KnativeServing
apiVersion: operator.knative.dev/v1beta1
metadata:
  name: knative-serving
  namespace: knative-serving
spec: {}
---
kind: KnativeEventing
apiVersion: operator.knative.dev/v1beta1
metadata:
  name: knative-eventing
  namespace: knative-eventing
spec: {}

Or just “click” the create button in OpenShift Console.

Finally, the last required component – KnativeKafka. We should at least enable the sink and source to install KafkaSink and KafkaSource CRDs and controllers.

apiVersion: operator.serverless.openshift.io/v1alpha1
kind: KnativeKafka
metadata:
  name: knative-kafka
  namespace: knative-eventing
spec:
  logging:
    level: INFO
  sink:
    enabled: true
  source:
    enabled: true

Functions Support in Quarkus

Although we will implement event-driven architecture today, our Quarkus apps are just exposing and calling HTTP endpoints. In order to expose the method as the HTTP endpoint we need to include the Quarkus Funqy HTTP module. On the other hand, to call the HTTP endpoint exposed by another component we can leverage Quarkus declarative REST client. Our app is storing data in the in-memory H2 database and uses Panache ORM.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-funqy-http</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-rest-client-jackson</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-jdbc-h2</artifactId>
</dependency>

In order to expose the method as the HTTP endpoint we just need to annotate it with @Funq. Here’s the function implementation from the payment-service. It receives two types of orders: reservation and confirmation. For the reservation order reservation (status=NEW) it reserves funds in the customer account. For the confirmation type, it accepts or rollbacks the transaction depending on the order’s status. By default, the method annotated with the @Funq annotation is exposed under the same path as its name – in our case the address of the endpoint is POST /reserve.

public class OrderReserveFunction {

   private static final String SOURCE = "payment";
    
   Logger log;
   OrderReserveService orderReserveService;
   OrderConfirmService orderConfirmService;

   public OrderReserveFunction(Logger log,
                               OrderReserveService orderReserveService,
                               OrderConfirmService orderConfirmService) {
      this.log = log;
      this.orderReserveService = orderReserveService;
      this.orderConfirmService = orderConfirmService;
   }

   @Funq
   public Customer reserve(Order order) {
      log.infof("Received order: %s", order);
      if (order.getStatus() == OrderStatus.NEW) {
         return orderReserveService.doReserve(order);
      } else {
         return orderConfirmService.doConfirm(order);
      }
   }

}

Let’s take a look at the payment reservation implementation. We assume multiple incoming requests to the same concurrently, so we need to lock the entity during the transaction. Once the reservation is performed, we need to send a response back to the order-service. We are leveraging Quarkus REST client for that.

@ApplicationScoped
public class OrderReserveService {

    private static final String SOURCE = "payment";

    Logger log;
    CustomerRepository repository;
    OrderSender sender;

    public OrderReserveService(Logger log,
                               CustomerRepository repository,
                               @RestClient OrderSender sender) {
        this.log = log;
        this.repository = repository;
        this.sender = sender;
    }

    @Transactional
    public Customer doReserve(Order order) {
        Customer customer = repository.findById(order.getCustomerId(), LockModeType.PESSIMISTIC_WRITE);
        if (customer == null)
            throw new NotFoundException();
        log.infof("Customer: %s", customer);
        if (order.getAmount() < customer.getAmountAvailable()) {
            order.setStatus(OrderStatus.IN_PROGRESS);
            customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
            customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
        } else {
            order.setStatus(OrderStatus.REJECTED);
        }
        order.setSource(SOURCE);
        repository.persist(customer);
        log.infof("Order reserved: %s", order);
        sender.send(order);
        return customer;
    }
}

Here’s the implementation of our REST client. It sends a message to the endpoint exposed by the KafkaSink object. The path of the endpoint corresponds to the name of the KafkaSink object. We also need to set HTTP headers to meet the CloudEvent format. Therefore we are registering the custom ClientHeadersFactory implementation.

@ApplicationScoped
@RegisterRestClient
@RegisterClientHeaders(CloudEventHeadersFactory.class)
public interface OrderSender {

    @POST
    @Path("/payment-sink")
    void send(Order order);

}

Our custom ClientHeadersFactory implementation sets some Ce-* (CloudEvent) headers. The most important header is Ce-Type and Ce-Source since we will do filtering based on that values then.

@ApplicationScoped
public class CloudEventHeadersFactory implements ClientHeadersFactory {

    AtomicLong id = new AtomicLong();

    @Override
    public MultivaluedMap<String, String> update(MultivaluedMap<String, String> incoming,
                                                 MultivaluedMap<String, String> outgoing) {
        MultivaluedMap<String, String> result = new MultivaluedHashMap<>();
        result.add("Ce-Id", String.valueOf(id.incrementAndGet()));
        result.add("Ce-Specversion", "1.0");
        result.add("Ce-Type", "reserve-event");
        result.add("Ce-Source", "stock");
        return result;
    }

}

Finally, let’s take a look at the payment confirmation service:

@ApplicationScoped
public class OrderConfirmService {

    private static final String SOURCE = "payment";

    Logger log;
    CustomerRepository repository;

    public OrderConfirmService(Logger log, 
                               CustomerRepository repository) {
        this.log = log;
        this.repository = repository;
    }

    @Transactional
    public Customer doConfirm(Order order) {
        Customer customer = repository.findById(order.getCustomerId());
        if (customer == null)
            throw new NotFoundException();
        log.infof("Customer: %s", customer);
        if (order.getStatus() == OrderStatus.CONFIRMED) {
            customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
            repository.persist(customer);
        } else if (order.getStatus() == OrderStatus.ROLLBACK && !order.getRejectedService().equals(SOURCE)) {
            customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
            customer.setAmountAvailable(customer.getAmountAvailable() + order.getAmount());
            repository.persist(customer);
        }
        return customer;
    }
    
}

Also, let’s take a look at the implementation of the function in the order-service.

public class OrderConfirmFunction {

    private final Logger log;
    private final OrderService orderService;

    public OrderConfirmFunction(Logger log, OrderService orderService) {
        this.log = log;
        this.orderService = orderService;
    }

    @Funq
    public void confirm(Order order) {
        log.infof("Accepted order: %s", order);
        orderService.doConfirm(order);
    }

}

Here’s the function implementation for the stock-service:

public class OrderReserveFunction {

    private static final String SOURCE = "stock";

    private final OrderReserveService orderReserveService;
    private final OrderConfirmService orderConfirmService;
    private final Logger log;

    public OrderReserveFunction(OrderReserveService orderReserveService,
                                OrderConfirmService orderConfirmService,
                                Logger log) {
        this.orderReserveService = orderReserveService;
        this.orderConfirmService = orderConfirmService;
        this.log = log;
    }

    @Funq
    public void reserve(Order order) {
        log.infof("Received order: %s", order);
        if (order.getStatus() == OrderStatus.NEW) {
            orderReserveService.doReserve(order);
        } else {
            orderConfirmService.doConfirm(order);
        }
    }

}

Configure Knative Eventing

After we finished the implementation of app logic we can proceed to the configuration of OpenShift Serverless and Knative components. If you are using my GitHub repository you don’t have to manually apply any YAML manifests. All the required configuration is applied during the Maven build. It is possible thanks to the Quarkus Kubernetes extension and its support for Knative. We just need to place all the required YAML manifests inside the src/main/kubernetes/knative.yml and the magic happens by itself.

However, in order to understand what happens let’s discuss step-by-step Knative configuration. In the first step, we need to create the KafkaSink objects. KafkaSink exposes the HTTP endpoint and gets CloudEvent on input. Then it sends that event to the particular topic in the Kafka cluster. Here’s the KafkaSink definition for the payment-service:

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: payment-sink
  namespace: demo-eventing
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topic: reserve-events
  numPartitions: 1

Both payment-service and stock-service send messages on the same reserve-events topic. Therefore, we can also create a single KafkaSink per those two services (I created two sinks, each of them dedicated to the single app). On the other hand, the order-service app sends messages to the order-events topic, so we have to create a separate KafkaSink:

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: order-sink
  namespace: demo-eventing
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topic: order-events
  numPartitions: 1

After that, let’s print the list of sinks in our cluster:

Now, this URL address should be set in the application properties for the REST client configuration. Here’s the fragment of the Quarkus application.properties:

%openshift.quarkus.rest-client."pl.piomin.samples.quarkus.serverless.order.client.OrderSender".url = http://kafka-sink-ingress.knative-eventing.svc.cluster.local/demo-eventing

With KafkaSink we are able to send messages to the Kafka cluster. In order to receive them on the target apps side we need to create other objects. In the first step, we will create Knative Broker and KafkaSource object. The broker may be easily created using kn CLI:

$ kn broker create default

The KafkaSource object connects to the Kafka cluster and receives messages from the defined list of topics. In our case, these are order-events and reserve-events. The output of the KafkaSource object is the already-created default broker. It means that all the messages exchanged between our three apps are delivered to the Knative Broker.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-to-broker
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - order-events
    - reserve-events
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default

In the final step, we need to configure the mechanism responsible for getting messages from Knative Broker and sending them to the target services. In order to do that, we have to create Trigger objects. A trigger can filter messages by CloudEvent attributes. Cloud event attributes are related to the Ce-* HTTP headers from the request. For example, the payment-service app receives only messages sent by the order-service and containing the order-event type.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: payment-trigger
spec:
  broker: default
  filter:
    attributes:
      source: order
      type: order-event
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: payment-service
    uri: /reserve

The stock-trigger object is very similar. It connects to the default Broker and gets only messages with the source=order and type=order-event. Finally, it calls the POST /reserve endpoint exposed by the stock-service.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: stock-trigger
spec:
  broker: default
  filter:
    attributes:
      source: order
      type: order-event
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: stock-service
    uri: /reserve

On the other hand, the order-service app should receive events from both stock-service and payment-service. Therefore, we are filtering messages just by the type attribute. The target endpoint of the order-service is POST /confirm.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: order-trigger
spec:
  broker: default
  filter:
    attributes:
      type: reserve-event
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: order-service
    uri: /confirm

Run Quarkus Apps on Knative

We can leverage the Quarkus Kubernetes/OpenShift extension to run the app as a Knative service. In order to do that we need to include the quarkus-openshift dependency in Maven pom.xml. We would like to use that module during the build only if we need to deploy the app on the cluster. Therefore, we will create a custom Maven profile openshift. Besides including the quarkus-openshift dependency it also enables deployment by setting the quarkus.kubernetes.deploy property to true and activates the custom Quarkus profile openshift.

<profiles>
  <profile>
    <id>openshift</id>
    <activation>
      <property>
        <name>openshift</name>
      </property>
    </activation>
    <dependencies>
      <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-openshift</artifactId>
      </dependency>
    </dependencies>
    <properties>
      <quarkus.kubernetes.deploy>true</quarkus.kubernetes.deploy>
      <quarkus.profile>openshift</quarkus.profile>
    </properties>
  </profile>
</profiles>

Once we include the quarkus-openshift module, we may use Quarkus configuration properties to customize the deployment process. Firstly, we need to set the quarkus.kubernetes.deployment-target property to knative. Thanks to that Quarkus will automatically generate the YAML manifest with Knative Service instead of a standard Kubernetes Deployment. We can also override default autoscaling settings with the quarkus.knative.revision-auto-scaling.* properties. The whole build process is running on the cluster with S2I (source-2-image), so we can use the internal OpenShift registry (the quarkus.container-image.registry property). Here’s the fragment of the application.properties file for the order-service.

quarkus.kubernetes-client.trust-certs = true
quarkus.kubernetes.deployment-target = knative
quarkus.knative.env.vars.tick-timeout = 10000
quarkus.knative.revision-auto-scaling.metric = rps
quarkus.knative.revision-auto-scaling.target = 50

%openshift.quarkus.container-image.group = demo-eventing
%openshift.quarkus.container-image.registry = image-registry.openshift-image-registry.svc:5000
%openshift.app.orders.timeout = ${TICK_TIMEOUT}

Finally, we just need to activate the openshift profile during the build and all the apps will be deployed to the target OpenShift cluster. You can deploy a single app or all the apps by running the following command in the repository root directory:

$ mvn clean package -Popenshift

Once we deploy our apps we display a list of Knative services.

openshift-serverless-knative-svc

We can also verify if all the triggers have been configured properly.

Also, let’s take a look at the “Topology” view on OpenShift which illustrates our serverless architecture.

openshift-serverless-topology

Testing Services

It is also worth creating some automated tests to verify the basic functionality before deployment. Since we have simple HTTP apps and an in-memory H2 database we can create standard tests. The only thing we need to do is to mock the HTTP client.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-junit5</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>io.rest-assured</groupId>
  <artifactId>rest-assured</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-junit5-mockito</artifactId>
  <scope>test</scope>
</dependency>

Here’s the JUnit test for the payment-service. We are verifying both the reservation and confirmation processes for the same order.

@QuarkusTest
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class OrderReserveFunctionTests {

    private static int amount;
    private CustomerRepository repository;
    @InjectMock
    @RestClient
    OrderSender sender;

    public OrderReserveFunctionTests(CustomerRepository repository) {
        this.repository = repository;
    }

    @Test
    @org.junit.jupiter.api.Order(1)
    void reserve() {
        given().contentType("application/json").body(createTestOrder(OrderStatus.NEW)).post("/reserve")
                .then()
                .statusCode(204);

        Customer c = repository.findById(1L);
        amount = c.getAmountAvailable();
        assertEquals(100, c.getAmountReserved());
    }

    @Test
    @org.junit.jupiter.api.Order(2)
    void confirm() {
        given().contentType("application/json").body(createTestOrder(OrderStatus.CONFIRMED)).post("/reserve")
                .then()
                .statusCode(204);

        Customer c = repository.findById(1L);
        assertEquals(0, c.getAmountReserved());
        assertEquals(amount, c.getAmountAvailable());
    }

    private Order createTestOrder(OrderStatus status) {
        Order o = new Order();
        o.setId(1L);
        o.setSource("test");
        o.setStatus(status);
        o.setAmount(100);
        o.setCustomerId(1L);
        return o;
    }
}

Also, let’s take a look at the logs of apps running on OpenShift. As you see, the order-service receives events from both stock-service and payment-service. After that, it confirms the order and sends a confirmation message to both services.

Here are the logs from the payment-service. As you see, it receives the CloudEvent generated by the order-service (the Ce-Source header equals to order).

Final Thoughts

With OpenShift Serverless and Knative Eventing, you can easily build event-driven architecture for simple HTTP-based apps. It is completely transparent for the app, which medium is used to store events and how they are routed. The only thing it needs to do is to prepare a request according to the CloudEvent specification. OpenShift Serverless brings several features to simplify development. We can also leverage Quarkus Kubernetes Extension to easily build and deploy our apps on OpenShift as Knative services.

The post Serverless on OpenShift with Knative, Quarkus and Kafka appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/04/18/serverless-on-openshift-with-knative-quarkus-and-kafka/feed/ 2 14100
Knative Eventing with Quarkus, Kafka and Camel https://piotrminkowski.com/2021/06/14/knative-eventing-with-quarkus-kafka-and-camel/ https://piotrminkowski.com/2021/06/14/knative-eventing-with-quarkus-kafka-and-camel/#comments Mon, 14 Jun 2021 07:47:54 +0000 https://piotrminkowski.com/?p=9797 In this article, you will learn how to use Quarkus with Camel to create applications that send messages to Kafka and receive CloudEvent from Knative Eventing. We will build a very similar system to the system described in my previous article Knative Eventing with Kafka and Quarkus. However, this time we will use Apache Camel […]

The post Knative Eventing with Quarkus, Kafka and Camel appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Quarkus with Camel to create applications that send messages to Kafka and receive CloudEvent from Knative Eventing. We will build a very similar system to the system described in my previous article Knative Eventing with Kafka and Quarkus. However, this time we will use Apache Camel instead of several Quarkus extensions including Kafka support.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

First, you should go to the saga directory. It contains two applications built on top of Quarkus and Apache Camel. Today we will implement an eventual consistency pattern (also known as a SAGA pattern). It will use the Knative Eventing model for exchanging events between our applications.

1. Prerequisites

Before we start, we need to configure some components like Kafka, Knative or Kafka Eventing Broker. Let’s go further these steps.

1.1. Install Apache Kafka cluster

Firstly, let’s create our kafka namespace.

$ kubectl create namespace kafka

Then we apply the installation files, including ClusterRolesClusterRoleBindings and other required Kubernetes CustomResourceDefinitions (CRD).

$ kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

After that, we can create a single-node persistent Apache Kafka cluster. We use an example custom resource for the Strimzi operator.

$ kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka

Finally, we may verify our installation. You should the same result as shown below.

1.2. Install Knative Serving and Eventing

You can install Knative on your Kubernetes cluster using YAML manifests or operator. The current version of Knative is 0.23. This is minimal list of steps you need to do with YAML-based installation is visible below. For more details, you may refer to the documentation. I place or the required commands to simplify a process for you. Let’s install Knative Serving.

$ kubectl apply -f https://github.com/knative/serving/releases/download/v0.23.0/serving-crds.yaml
$ kubectl apply -f https://github.com/knative/serving/releases/download/v0.23.0/serving-core.yaml
$ kubectl apply -f https://github.com/knative/net-kourier/releases/download/v0.23.0/kourier.yaml
$ kubectl patch configmap/config-network \
  --namespace knative-serving \
  --type merge \
  --patch '{"data":{"ingress.class":"kourier.ingress.networking.knative.dev"}}'

Then let’s install Knative Eventing.

$ kubectl apply -f https://github.com/knative/eventing/releases/download/v0.23.0/eventing-crds.yaml
$ kubectl apply -f https://github.com/knative/eventing/releases/download/v0.23.0/eventing-core.yaml

1.3. Install Knative Kafka Eventing

The following commands install the Apache Kafka broker, and run event routing in a system namespace, knative-eventing, by default.

$ kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-controller.yaml
$ kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-broker.yaml

Then, we should install CRD with KafkaBinding and KafkaSource.

$ kubectl apply -f https://storage.googleapis.com/knative-releases/eventing-contrib/latest/kafka-source.yaml

Finally, let’s just create a broker.

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: default
  namespace: default
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing

1.4. Install Apache Camel K operator (optional)

If you would like to use Apache Camel K to run the Quarkus application on Knative you first install its operator. After downloading Camel K CLI you just need to run the following command.

$ kamel install

2. Run the application with Apache Camel K

In order to deploy and run an application on Knative, we may execute the kamel run command. Some parameters might be set in the source file. In the command visible below, I’m setting the name Knative service, source file location, and Quarkus properties.

$ kamel run --name order-saga --dev OrderRoute.java \
    -p quarkus.datasource.db-kind=h2 \
    -p quarkus.datasource.jdbc.url=jdbc:h2:mem:testdb \
    -p quarkus.hibernate-orm.packages=com.github.piomin.entity.model.order

For more details about deploying Quarkus with Camel K on Kubernetes you may refer to my article Apache Camel K and Quarkus on Kubernetes.

Finally, I was not able to deploy exactly this application on Knative with Camel K. That’s because it didn’t see JPA entities included in the application with the external library. However, the application is also prepared for deploying with Camel K. The whole source code is a single Java file and there are some Camel K modeline hooks in this source code.

3. Integrate Quarkus with Apache Camel

We can easily integrate Apache Camel routes with Quarkus. Camel Quarkus provides extensions for many of the Camel components. We need to include those components in our Maven pom.xml. What type of components do we need? The full list is visible below. However, first, let’s discuss them a little bit more.

Our application uses JPA to store entities in the H2 database. So we will include the Camel Quarkus JPA extension to provide the JPA implementation with Hibernate. For a single persistence unit, this extension automatically creates EntityManagerFactory and TransactionManager. In order to integrate with Apache Kafka, we need to include the Camel Quarkus Kafka extension. In order to receive events from Knative, we should expose the HTTP POST endpoint. That’s why we need several extensions like Platform HTTP or Jackson. Here’s my list of Maven dependencies.

<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-core</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-platform-http</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-bean</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-timer</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-jpa</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-rest</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-jackson</artifactId>
</dependency>

Then, we just need to create a class that extends RouteBuilder. The routes have to be defined inside the configure() method. Before we get into the details let’s analyze our domain model classes.

public class CustomerRoute extends RouteBuilder {
   @Override
   public void configure() throws Exception { 
      ...
   }
}

4. Domain model for Quarkus JPA and Kafka

I created a separate project for entity model classes. The repository is available on GitHub https://github.com/piomin/entity-model.git. Thanks to that, I have a typical serverless application with consists of a single class. It is also possible to easily deploy it on Knative with Camel K. Here’s a model entity for order-saga.

@Entity
@Table(name = "orders")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Order implements Serializable {
    @Id
    @GeneratedValue
    private Long id;
    private Integer customerId;
    private Integer productId;
    private int amount;
    private int productCount;
    @Enumerated
    private OrderStatus status = OrderStatus.NEW;
}

Just to simplify, I’m using the same class when sending events to Kafka. We can also take a look at a model entity for customer-saga.

@Entity
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Customer implements Serializable {
    @Id
    @GeneratedValue
    private Long id;
    private String name;
    private int amountAvailable;
    private int amountReserved;
}

5. Building Camel routes with Quarkus and Kafka extension

In the first step, we are going to generate orders and send them to the Kafka topic. Before that, we will store them in the H2 database using the Camel JPA extension. This part of logic is implemented inside order-saga.

from("timer:tick?period=10000")
   .setBody(exchange -> 
      new Order(null, r.nextInt(10) + 1, r.nextInt(10) + 1, 100, 1, OrderStatus.NEW))
   .to("jpa:" + Order.class.getName())
   .marshal().json(JsonLibrary.Jackson)
   .log("New Order: ${body}")
   .toD("kafka:order-events?brokers=${env.KAFKA_BOOTSTRAP_SERVERS}");

Some things need to be clarified here. Before sending a message to the Kafka topic we need to serialize it to the JSON format. The application does not anything about the Kafka address. This address has been injected into the container by the KafkaBinding object. It is available for the Camel route as the environment variable KAFKA_BOOTSTRAP_SERVERS.

Now, let’s switch to the customer-saga application. In order to receive an event for the Knative broker, we should expose an HTTP POST endpoint. This endpoint takes Order as an input. Then, if the order’s status equals NEW it performs a reservation on the customer account. Before that, it sends back a response to a reserver-events topic.

Also, let’s take a look at the fragment responsible for searching the customer in the database and performing an update. We use Quarkus Camel JPA extension. First, we need to define a JPQL query to retrieve an entity. Then, we update the Customer entity depending on the order status.

rest("/customers")
   .post("/reserve").consumes("application/json")
   .route()
      .log("Order received: ${body}")
      .unmarshal().json(JsonLibrary.Jackson, Order.class)
      .choice()
         .when().simple("${body.status.toString()} == 'NEW'")
            .setBody(exchange -> {
               Order order = exchange.getIn().getBody(Order.class);
               order.setStatus(OrderStatus.IN_PROGRESS);
               return order;
            })
            .marshal().json(JsonLibrary.Jackson)
            .log("Reservation sent: ${body}")
            .toD("kafka:reserve-events?brokers=${env.KAFKA_BOOTSTRAP_SERVERS}")
      .end()
      .unmarshal().json(JsonLibrary.Jackson, Order.class)
      .setProperty("orderAmount", simple("${body.amount}", Integer.class))
      .setProperty("orderStatus", simple("${body.status}", OrderStatus.class))
      .toD("jpa:" + Customer.class.getName() + 
         "?query=select c from Customer c where c.id= ${body.customerId}")
      .choice()
         .when().simple("${exchangeProperty.orderStatus} == 'IN_PROGRESS'")
            .setBody(exchange -> {
               Customer customer = (Customer) exchange.getIn().getBody(List.class).get(0);
               customer.setAmountReserved(customer.getAmountReserved() + 
                  exchange.getProperty("orderAmount", Integer.class));
               customer.setAmountAvailable(customer.getAmountAvailable() - 
                  exchange.getProperty("orderAmount", Integer.class));
               return customer;
            })
            .otherwise()
               .setBody(exchange -> {
                  Customer customer = (Customer) exchange.getIn().getBody(List.class).get(0);
                  customer.setAmountReserved(customer.getAmountReserved() - 
                     exchange.getProperty("orderAmount", Integer.class));
                  return customer;
               })
      .end()
      .log("Current customer: ${body}")
      .to("jpa:" + Customer.class.getName() + "?useExecuteUpdate=true")
      .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(201)).setBody(constant(null))
.endRest();

We can also generate some test data in customer-saga using Camel route. It runs once just after the Quarkus application startup.

from("timer://runOnce?repeatCount=1&delay=100")
   .loop(10)
      .setBody(exchange -> new Customer(null, "Test"+(++i), r.nextInt(50000), 0))
      .to("jpa:" + Customer.class.getName())
      .log("Add: ${body}")
   .end();

6. Configure Knative Eventing with Kafka broker

6.1. Architecture with Quarkus, Camel and Kafka

We have already created two applications built on top of Quarkus and Apache Camel. Both of these applications expose HTTP POST endpoints and send events to the Kafka topics. Now, we need to create some Kubernetes objects to orchestrate the process. So far, we just send the events to topics, they have not been routed to the target applications. Let’s take a look at the architecture of our system. There are two topics on Kafka that receive events: order-events and reserve-events. The messages from those topics are not automatically sent to the Knative broker. So, first, we need to create the KafkaSource object to get messages from these topics and send them to the broker.

quarkus-camel-kafka-arch

6.2. Configure Knative eventing

The KafkaSource object takes the list of input topics and a target application. In that case, the target application is the Knative broker. We may create a single KafkaSource for both topics or two sources per each topic. Here’s the KafkaSource definition that takes messages from the reserve-events topic.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-reserve-order
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - reserve-events
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default

Let’s create both sources. After creating them, we may verify if everything went well by executing the command kubectl get sources.

quarkus-camel-kafka-sources

Before running our application on Knative we should create KafkaBinding objects. This object is responsible for injecting the address of the Kafka cluster into the application containers. The address of a broker will be available for the application under the KAFKA_BOOTSTRAP_SERVERS environment variable.

apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
  name: kafka-binding-customer-saga
spec:
  subject:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: customer-saga
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092

Let’s create both KafkaBinding objects. Here’s the list of available bindings after running the kubectl get bindings command.

Finally, we may proceed to the last step of our configuration. We will create triggers. A trigger represents a desire to subscribe to events from a specific broker. Moreover, we may apply a simple filtering mechanism using the Trigger object. For example, if we want to send only the events from the order-events topic and with the type dev.knative.kafka.event we should create a definition as shown below.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: customer-saga-trigger
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/kafka-source-orders-customer#order-events
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /customers/reserve

Similarly, we should create a trigger that sends messages to the order-saga POST endpoint. It gets messages from the reserve-events source and sends them to the /orders/confirm endpoint.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: order-saga-trigger
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/kafka-source-reserve-order#reserve-events
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: order-saga
    uri: /orders/confirm

Finally, we can display a list of active triggers by executing the command kubectl get trigger.

quarkus-camel-kafka-triggers

7. Deploy Quarkus application on Knative

Once we finished the development of our sample applications we may deploy them on Knative. One of the possible deployment options is with Apache Camel K. In case of any problems with this type of deployment we may also use the Quarkus Kubernetes module. Firstly, let’s include two required modules. We will also leverage the Jib Maven plugin.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-kubernetes</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-container-image-jib</artifactId>
</dependency>

The rest of the configuration should be provided inside the application properties file. In the first step, we need to enable automatic deployment on Kubernetes by setting the property quarkus.kubernetes.deploy to true. By default, Quarkus creates a standard Kubernetes Deployment. Therefore, we should set the quarkus.kubernetes.deployment-target to knative. In that case, it will generate a Knative Service YAML. Finally, we have to change the name of the image group to dev.local. Of course, it is required just if we run our applications on the local Kubernetes cluster like me.

quarkus.kubernetes.deploy = true
quarkus.kubernetes.deployment-target = knative
quarkus.container-image.group = dev.local

Now, if run the build with the mvn clean package command, our application will be automatically deployed on Knative. After that, let’s verify the list of Knative services.

Once, the order-saga application is started, it generates one order per 10 seconds and then sends it to the Kafka topic order-events. We can easily verify that it works properly, by checking out a list of active topics as shown below.

We can also verify a list of Knative events exchanged by the applications.

$ kubectl get eventtype

Final Thoughts

Quarkus and Apache Camel seem to be a perfect combination when creating serverless applications on Knative. We can easily implement the whole logic within a single source code file. We can also use Camel K to deploy our applications on Kubernetes or Knative. You can compare the approach described in this article with the one based on Quarkus and its extensions to Kafka or Knative available in this repository.

The post Knative Eventing with Quarkus, Kafka and Camel appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/06/14/knative-eventing-with-quarkus-kafka-and-camel/feed/ 4 9797
Knative Eventing with Kafka and Quarkus https://piotrminkowski.com/2021/03/31/knative-eventing-with-kafka-and-quarkus/ https://piotrminkowski.com/2021/03/31/knative-eventing-with-kafka-and-quarkus/#respond Wed, 31 Mar 2021 12:55:26 +0000 https://piotrminkowski.com/?p=9620 In this article, you will learn how to run eventing applications on Knative using Kafka and Quarkus. Previously I described the same approach for Kafka and Spring Cloud. If you want to compare both of them read my article Knative Eventing with Kafka and Spring Cloud. We will deploy exactly the same architecture. However, instead […]

The post Knative Eventing with Kafka and Quarkus appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to run eventing applications on Knative using Kafka and Quarkus. Previously I described the same approach for Kafka and Spring Cloud. If you want to compare both of them read my article Knative Eventing with Kafka and Spring Cloud. We will deploy exactly the same architecture. However, instead of Spring Cloud Functions we will use Quarkus Funqy. Also, Spring Cloud Stream may be replaced with Quarkus Kafka. Before we start, let’s clarify some things.

Concept over Knative and Quarkus

Quarkus supports Knative in several ways. First of all, we may use the Quarkus Kubernetes module to simplify deployment on Knative. We can also use the Quarkus Funqy Knative Event extension to route and process cloud events within functions. That’s not all. Quarkus supports a serverless functional style. With the Quarkus Funqy module, we can write functions deployable to various FaaS (including Knative). These functions can be invoked through HTTP. Finally, we may integrate our application with Kafka topics using annotations from the Quarkus Kafka extension.

The Quarkus Funqy Knative Event module bases on the Knative broker and triggers. Since we will use Kafka Source instead of broker and trigger we won’t include that module. However, we can still take advantage of Quarkus Funqy and HTTP binding.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

As I mentioned before, we will the same architecture and scenario as in my previous article about Knative eventing. Let’s briefly describe it.

Today we will implement an eventual consistency pattern (also known as a SAGA pattern). How it works? The sample system consists of three services. The order-service creates a new order that is related to the customers and products. That order is sent to the Kafka topic. Then, our two other applications customer-service and product-service receive the order event. After that, they perform a reservation. The customer-service reserves an order’s amount on the customer’s account. Meanwhile the product-service reserves a number of products specified in the order. Both these services send a response to the order-service through the Kafka topic. If the order-service receives positive reservations from both services it confirms the order. Then, it sends an event with that information. Both customer-service and product-service receive the event and confirm reservations. You can verify it in the picture below.

quarkus-knative-eventing-arch

Prerequisites

There are several requirements we need to comply before start. I described them all in my previous article about Knative Eventing. Here’s just a brief remind:

  1. Kubernetes cluster with at least 1.17 version. I’m using a local cluster. If you use a remote cluster replace dev.local in image name into your Docker account name
  2. Install Knative Serving and Eventing on your cluster. You may find the detailed installation instructions here.
  3. Install Kafka Eventing Broker. Here’s the link to the releases site. You don’t need everything – we will use the KafkaSource and KafkaBinding CRDs
  4. Install Kafka cluster with the Strimzi operator. I installed it in the kafka namespace. The name of my cluster is my-cluster.

Step 1. Installing Kafka Knative components

Assuming you have already installed all the required elements to run Knative Eventing on your Kubernetes cluster, we may create some components dedicated to applications. You may find YAML manifests with object declarations in the k8s directory inside every single application directory. Firstly, let’s create a KafkaBinding. It is responsible for injecting the address of the Kafka cluster into the application container. Thanks to KafkaBinding that address is visible inside the container as the KAFKA_BOOTSTRAP_SERVERS environment variable. Here’s an example of the YAML declaration for the customer-saga application. We should create similar objects for two other applications.

apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
  name: kafka-binding-customer-saga
spec:
  subject:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: customer-saga
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092

In the next step, we create the KafkaSource object. It reads events from the particular topic and passes them to the consumer. It calls the HTTP POST endpoint exposed by the application. We can override a default context path of the HTTP endpoint. For the customer-saga the target URL is /reserve. It should receive events from the order-events topic. Because both customer-saga and product-saga listen for events from the order-events topic we need to create a similar KafkaSource object also for product-saga.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-orders-customer
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - order-events
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /reserve

On the other hand, the order-saga listens for events on the reserve-events topic. If you want to verify our scenario once again please refer to the diagram in the Source Code section. This time the target URL is /confirm.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-orders-confirm
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - reserve-events
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: order-saga
    uri: /confirm

Let’s verify a list of Kafka sources. In our case there is a single KafkaSource per application. Before deploying our Quarkus application on Knative your Kafka source won’t be ready.

quarkus-knative-eventing-kafkasource

Step 2. Integrating Quarkus with Kafka

In order to integrate Quarkus with Apache Kafka, we may use the SmallRye Reactive Messaging library. Thanks to that we may define an input and output topic for each method using annotations. The messages are serialized to JSON. We can also automatically expose Kafka connection status in health check. Here’s the list of dependencies we need to include in Maven pom.xml.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-jackson</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-health</artifactId>
</dependency>

Before we start with the source code, we need to provide some configuration settings in the application.properties file. Of course, Kafka requires a connection URL to the cluster. We use the environment variable injected by the KafkaBinding object. Also, the output topic name should be configured. Here’s a list of required properties for the order-saga application.

kafka.bootstrap.servers = ${KAFKA_BOOTSTRAP_SERVERS}

mp.messaging.outgoing.order-events.connector = smallrye-kafka
mp.messaging.outgoing.order-events.topic = order-events
mp.messaging.outgoing.order-events.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Finally, we may switch to the code. Let’s start with order-saga. It will continuously send orders to the order-events topic. Those events are received by both customer-saga and product-saga applications. The method responsible for generating and sending events returns reactive stream using Mutiny Multi. It sends an event every second. We need to annotate the method with the @Outgoing annotation passing the name of output defined in application properties. Also, @Broadcast annotation Indicates that the event is dispatched to all subscribers. Before sending, every order needs to be persisted in a database (we use H2 in-memory database).

@ApplicationScoped
@Slf4j
public class OrderPublisher {

   private static int num = 0;

   @Inject
   private OrderRepository repository;
   @Inject
   private UserTransaction transaction;

   @Outgoing("order-events")
   @Broadcast
   public Multi<Order> orderEventsPublish() {
      return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
           .map(tick -> {
              Order o = new Order(++num, num%10+1, num%10+1, 100, 1, OrderStatus.NEW);
              try {
                 transaction.begin();
                 repository.persist(o);
                 transaction.commit();
              } catch (Exception e) {
                 log.error("Error in transaction", e);
              }

              log.info("Order published: {}", o);
              return o;
           });
   }

}

Step 3. Handling Knative events with Quarkus Funqy

Ok, in the previous step we have already implemented a part of the code responsible for sending events to the Kafka topic. We also have KafkaSource that is responsible for dispatching events from the Kafka topic into the application HTTP endpoint. Now, we just need to handle them. It is very simple with Quarkus Funqy. It allows us to create functions according to the serverless Faas approach. But we can also easily bound each function to the HTTP endpoint with the Quarkus Funqy HTTP extension. Let’s include it in our dependencies.

 <dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-funqy-http</artifactId>
 </dependency>

In order to create a function with Quarkus Funqy, we just need to annotate the particular method with @Funq. The name of the method is reserve, so it is automatically bound to the HTTP endpoint POST /reserve. It takes a single input parameter, which represents incoming order. It is automatically deserialized from JSON.

In the fragment of code visible below, we implement order handling in the customer-saga application. Once it receives an order, it performs a reservation on the customer account. Then it needs to send a response to the order-saga. To do that we may use Quarkus reactive messaging support once again. We define the Emitter object that allows us to send a single event into the topic. We may use inside a method that does not return any output that should be sent to a topic (with @Outgoing). The Emitter bean should be annotated with @Channel. It works similar to @Outgoing. We also need to define an output topic related to the name of the channel.

@Slf4j
public class OrderReserveFunction {

   @Inject
   private CustomerRepository repository;
   @Inject
   @Channel("reserve-events")
   Emitter<Order> orderEmitter;

   @Funq
   public void reserve(Order order) {
      log.info("Received order: {}", order);
      doReserve(order);
   }

   private void doReserve(Order order) {
      Customer customer = repository.findById(order.getCustomerId());
      log.info("Customer: {}", customer);
      if (order.getStatus() == OrderStatus.NEW) {
         customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
         customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
         order.setStatus(OrderStatus.IN_PROGRESS);
         log.info("Order reserved: {}", order);
         orderEmitter.send(order);
      } else if (order.getStatus() == OrderStatus.CONFIRMED) {
         customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
      }
      repository.persist(customer);
   }
}

Here are configuration properties for integration between Kafka and Emitter. The same configuration properties should be created for both customer-saga and product-saga.

kafka.bootstrap.servers = ${KAFKA_BOOTSTRAP_SERVERS}

mp.messaging.outgoing.reserve-events.connector = smallrye-kafka
mp.messaging.outgoing.reserve-events.topic = reserve-events
mp.messaging.outgoing.reserve-events.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Finally, let’s take a look at the implementation of the Quarkus function inside the product-saga application. It also sends a response to the reserve-events topic using the Emitter object. It handles incoming orders and performs a reservation for the requested number of products.

@Slf4j
public class OrderReserveFunction {

   @Inject
   private ProductRepository repository;

   @Inject
   @Channel("reserve-events")
   Emitter<Order> orderEmitter;

   @Funq
   public void reserve(Order order) {
      log.info("Received order: {}", order);
      doReserve(order);
   }

   private void doReserve(Order order) {
      Product product = repository.findById(order.getProductId());
      log.info("Product: {}", product);
      if (order.getStatus() == OrderStatus.NEW) {
         product.setReservedItems(product.getReservedItems() + order.getProductsCount());
         product.setAvailableItems(product.getAvailableItems() - order.getProductsCount());
         order.setStatus(OrderStatus.IN_PROGRESS);
         orderEmitter.send(order);
      } else if (order.getStatus() == OrderStatus.CONFIRMED) {
         product.setReservedItems(product.getReservedItems() - order.getProductsCount());
      }
      repository.persist(product);
   }
}

Step 4. Deploy Quarkus application on Knative

Finally, we can deploy all our applications on Knative. To simplify that process we may use Quarkus Kubernetes support. It is able to automatically generate deployment manifests based on the source code and application properties. Quarkus also supports building images with Jib. So first, let’s add the following dependencies to Maven pom.xml.

 <dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-kubernetes</artifactId>
 </dependency>
 <dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-container-image-jib</artifactId>
 </dependency>

In the next step, we need to add some configuration settings to the application.properties file. To enable automatic deployment on Kubernetes the property quarkus.kubernetes.deploy must be set to true. Then we should change the target platform into Knative. Thanks to that Quarkus will generate Knative Service instead of a standard Kubernetes Deployment. The last property quarkus.container-image.group is responsible for setting the name of the image owner group. For local development with Knative, we should set the dev.local value there.

quarkus.kubernetes.deploy = true
quarkus.kubernetes.deployment-target = knative
quarkus.container-image.group = dev.local

After setting all the values visible above we just need to execute Maven build to deploy the application.

$ mvn clean package

After running Maven build for all the applications let’s verify a list of Knative Services.

Once the order-saga application starts it begins sending orders continuously. It also receives order events sent by customer-saga and product-saga. Those events are processed by the Quarkus function. Here are the logs printed by order-saga.

Final Thoughts

As you see, we can easily implement and deploy Quarkus applications on Knative. Quarkus provides several extensions that simplify integration with the Knative Eventing model and Kafka broker. We can use Quarkus Funqy to implement the serverless FaaS approach or SmallRye Reactive Messaging to integrate with Apache Kafka. You can compare that Quarkus support with Spring Boot in my previous article: Knative Eventing with Kafka and Spring Cloud.

The post Knative Eventing with Kafka and Quarkus appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/03/31/knative-eventing-with-kafka-and-quarkus/feed/ 0 9620
Knative Eventing with Kafka and Spring Cloud https://piotrminkowski.com/2021/03/12/knative-eventing-with-kafka-and-spring-cloud/ https://piotrminkowski.com/2021/03/12/knative-eventing-with-kafka-and-spring-cloud/#comments Fri, 12 Mar 2021 11:55:12 +0000 https://piotrminkowski.com/?p=9569 In this article, you will learn how to run eventing applications on Knative using Kafka and Spring Cloud. I’ll show you what is Knative Eventing, and how to integrate it with the Kafka broker. We will build our applications on top of Spring Cloud Function and Spring Cloud Stream. All these solutions seem to be […]

The post Knative Eventing with Kafka and Spring Cloud appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to run eventing applications on Knative using Kafka and Spring Cloud. I’ll show you what is Knative Eventing, and how to integrate it with the Kafka broker. We will build our applications on top of Spring Cloud Function and Spring Cloud Stream. All these solutions seem to be a perfect match. Why? Let me invite you to read the article.

However, before we proceed you need to have a piece of knowledge about Knative basic concepts. Therefore, I suggest you read more about it. You can start with those two articles: Spring Boot on Knative and Microservices on Knative with GraalVM and Spring Boot. Of course, you can as well refer to the Knative documentation.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

Today we will base on the simple architecture that complies with an eventual consistency pattern. It is also known as a SAGA pattern. What exactly is that? The sample system consists of three services. The order-service creates a new order that is related to the customers and products. That order is sent to the Kafka topic. Then, our two other applications customer-service and product-service receive the order event. After that, they perform a reservation. The customer-service reserves an order’s amount on the customer’s account. Meanwhile the product-service reserves a number of products specified in the order. Both these services send a response to the order-service through the Kafka topic. If the order-service receives positive reservations from both services it confirms the order. Then, it sends an event with that information. Both customer-service and product-service receive the event and confirm reservations. You can verify it in the picture below.

knative-eventing-kafka-arch

Prerequisites

Before we start, we first need to install Knative on the Kubernetes cluster. I’m using a local instance of Kubernetes. But you may as well use any remote like GKE. However, the latest version of Knative requires a Kubernetes cluster v1.17 or later. Of course, we need to install both Serving and Eventing components. You may find the detailed installation instructions here.

That’s not all. We also need to install Kafka Eventing Broker. Here’s the link to the releases site. It includes several deployments and CRDs. You should pay special attention to the KafkaSource and KafkaBinding CRDs, since we will use them later.

Finally, we need to install Kafka cluster on Kubernetes. The recommended way to do that is with the Strimzi operator. Strimzi provides container images and operators for running Kafka on Kubernetes. It also comes with a set of CRDs for managing the Kafka cluster. Once you install it you may proceed to the next steps. I installed it in the kafka namespace. Here’s the list of running pods.

Step 1: Create and configure Knative Kafka Broker

In the first step, we are going to create a Kafka cluster using Strimzi CRD. To simplify, we won’t use any more advanced configuration settings. For example, I used ephemeral storage, which is not recommended in production. I set three instances of Zookeeper. I heard that Kafka is finally planning to resign from Zookeeper, but the current version still bases on it.

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 1
    listeners:
      plain: {}
      tls: {}
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.4"
    storage:
      type: ephemeral
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
  entityOperator:
    topicOperator: {}
    userOperator: {}

The Knative broker allows to route events to different event sinks or consumers. We may use different broker providers. When an event is sent to the broker, all request metadata other than the CloudEvent data and context attributes are stripped away. The event delivery mechanism hides details of event routing from the event producer and consumer. The default broker class is MTChannelBasedBroker. We will change it into Kafka.

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: default
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing

In this article, we won’t directly use Kafka broker. Instead, we will use the KafkaSource object that takes events from a particular topic and sends them to the subscriber. If you want to use Broker you need to define Knative Trigger that refers to it.

The broker refers to the ConfigMap kafka-broker-config. The most important thing there is to set the address of the Kafka cluster. If you didn’t change anything in the default Kafka installation files it is ${KAFKA_CLUSTER_NAME}-kafka-bootstrap and port 9092.

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  default.topic.partitions: "10"
  default.topic.replication.factor: "1"
  bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"

Step 2: Create an application with Spring Cloud Stream

Let’s start with dependencies. Each of our applications uses an in-memory H2 database. They integrate with the database using the Spring Data JPA repository pattern. However, the most important thing is that they all base on Spring Cloud Stream to interact with Kafka topics. Spring Cloud Stream requires adding a concrete binder implementation to the classpath. That’s why we add the spring-cloud-starter-stream-kafka starter. For some time the Spring Cloud Stream programming model is built on top of Spring Cloud Function. Fortunately, we may easily export functions as an HTTP endpoint. This feature will be useful for us later. Currently, let’s just take a look at a list of included dependencies.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-function-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.16</version>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
</dependency>

Here’s the model class for the order-service. Once the order is created and saved in the database, the order-service sends it to the output Kafka topic.

@Entity
@Table(name = "orders")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Order {
    @Id
    private Integer id;
    private Integer customerId;
    private Integer productId;
    private int amount;
    private int productCount;
    @Enumerated
    private OrderStatus status = OrderStatus.NEW;
}

We have three functions inside the order-service application main class. Two of them send events to the output destination continuously. On the other hand, the third of them confirm() wait for incoming events. We will discuss it later. The orderEventSupplier function represents the first step in our scenario. It creates a new order with test data, saves it in the database before sending.

@SpringBootApplication
@Slf4j
public class OrderSagaApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderSagaApplication.class, args);
    }
    private static int num = 0;
    private BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
    @Bean
    public Supplier<Order> orderEventSupplier() {
        return () -> repository.save(new Order(++num, num%10+1, num%10+1, 100, 1, OrderStatus.NEW));
    }
    @Bean
    public Supplier<Order> orderConfirmSupplier() {
        return () -> queue.poll();
    }
    @Bean
    public Consumer<Message<Order>> confirm() {
        return this::doConfirm;
    }
    @Autowired
    OrderRepository repository;
    private void doConfirm(Message<Order> msg) {
        Order o = msg.getPayload();
        log.info("Order received: {}", o);
        Order order = repository.findById(o.getId()).orElseThrow();
        if (order.getStatus() == OrderStatus.NEW) {
            order.setStatus(OrderStatus.IN_PROGRESS);
        } else if (order.getStatus() == OrderStatus.IN_PROGRESS) {
            order.setStatus(OrderStatus.CONFIRMED);
            log.info("Order confirmed : {}", order);
            queue.offer(order);
        }
        repository.save(order);
    }
}

The name of the output Kafka topic is order-events. We set it for both Supplier functions using the Spring Cloud Stream bindings pattern. On the other hand, the Consumer function will not receive events directly from the Kafka topic. Why? Because it is a part of Knative Eventing process and I will explain it later in the step. For now, it is important to specify that only suppliers bind to the external destination using the spring.cloud.function.definition property.

spring.application.name: order-saga
spring.cloud.stream.bindings.orderEventSupplier-out-0.destination: order-events
spring.cloud.stream.bindings.orderConfirmSupplier-out-0.destination: order-events
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
spring.cloud.function.definition: orderEventSupplier;orderConfirmSupplier

Finally, we need to create the KafkaBinding that will inject Kafka bootstrap information into the application container (through the Knative Service). Then, the application can access it as the KAFKA_BOOTSTRAP_SERVERS environment variable.

apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
  name: kafka-binding-order-saga
spec:
  subject:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: order-saga
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092

Step 3: Create Kafka sources and Spring Cloud Function endpoints

Ok, we have already created a function responsible for generating and sending orders to the Kafka topic inside the order-service. So, now our goal is to receive and handle it on the customer-service and product-service sides. Our applications won’t directly listen for incoming events on the Kafka topic. To clarify, the basic Knative Eventing assumption is that the application don’t care how the events are published. It will just receive the events as an HTTP POST. And here comes KafkaSource object. It takes a list of input topics and a destination sink as parameters. In our case, it gets messages from order-events and send it as HTTP POST to the endpoint /customers/reserve of the customer-saga Knative Service.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - order-events
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /customers/reserve

Here’s an implementation of the customer-saga application. Thanks to Spring Cloud Function Web it automatically exports the reserve function as the HTTP endpoint with the path /reserve. Once, the consumer receives the event it performs the rest of business logic. If the input order has a NEW status the customer-saga creates reservation for a particular amount on the customer account. Then it sends event response to the order-saga. In other words, it first puts event into BlockingQueue. We also use a Supplier function for sending events to the Kafka topic. This time supplier function takes Order objects from BlockingQueue. Finally, if our application receives confirmation order from order-saga it commits the whole transaction by removing reserved amount.

@SpringBootApplication
@Slf4j
public class CustomerSagaApplication {
    public static void main(String[] args) {
        SpringApplication.run(CustomerSagaApplication.class, args);
    }
    private BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
    @Autowired
    private CustomerRepository repository;
    
    @Bean
    public Supplier<Order> orderEventSupplier() {
        return () -> queue.poll();
    }
    @Bean
    public Consumer<Message<Order>> reserve() {
        return this::doReserve;
    }
    private void doReserve(Message<Order> msg) {
        Order order = msg.getPayload();
        log.info("Body: {}", order);
        Customer customer = repository.findById(order.getCustomerId()).orElseThrow();
        log.info("Customer: {}", customer);
        if (order.getStatus() == OrderStatus.NEW) {
            customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
            customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
            order.setStatus(OrderStatus.IN_PROGRESS);
            queue.offer(order);
        } else if (order.getStatus() == OrderStatus.CONFIRMED) {
            customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
        }
        repository.save(customer);
    }
}

We can also set the base context path for HTTP endpoints using the spring.cloud.function.web.path property. So, the final path of our target endpoint is /customers/reserver. It is the same as the address defined in the KafkaSource definition.

spring.cloud.function.web.path: /customers

Here’s a configuration for the customer-saga inside the application.yml file.

spring.application.name: customer-saga
spring.cloud.function.web.path: /customers
spring.cloud.stream.bindings.orderEventSupplier-out-0.destination: reserve-events
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
spring.cloud.function.definition: orderEventSupplier

The implementation of the business logic inside product-saga is pretty similar to the customer-saga. There is a single Consumer function that receives orders, and a single Supplier responsible for sending a response to the order-saga.

@SpringBootApplication
@Slf4j
public class ProductSagaApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProductSagaApplication.class, args);
    }
    @Autowired
    private ProductRepository repository;
    private BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
    @Bean
    public Supplier<Order> orderEventSupplier() {
        return () -> queue.poll();
    }
    @Bean
    public Consumer<Message<Order>> reserve() {
        return this::doReserve;
    }
    private void doReserve(Message<Order> msg) {
        Order order = msg.getPayload();
        log.info("Body: {}", order);
        Product product = repository.findById(order.getProductId()).orElseThrow();
        log.info("Product: {}", product);
        if (order.getStatus() == OrderStatus.NEW) {
            product.setReservedItems(product.getReservedItems() + order.getProductsCount());
            product.setAvailableItems(product.getAvailableItems() - order.getProductsCount());
            order.setStatus(OrderStatus.IN_PROGRESS);
            queue.offer(order);
        } else if (order.getStatus() == OrderStatus.CONFIRMED) {
            product.setReservedItems(product.getReservedItems() - order.getProductsCount());
        }
        repository.save(product);
    }
}

Step 4: Run applications on Knative Eventing and Kafka

Here’s a typical definition of the Knative Service for our applications. I’m using the dev.local option, but if you run a remote cluster you may replace it with your Docker username or any other repository account you have.

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: order-saga
spec:
  template:
    spec:
      containers:
        - image: dev.local/order-saga

I use Skaffold together with Jib Maven Plugin for building and deploying applications on Knative. My target namespace is serverless. With the tail option you may observe logs after deployment. Of course, you may as well use the skaffold dev command.

$ skaffold run --tail -n serverless

After running all our applications on Knative eventing with Kafka we may verify a list of services using kn CLI.

knative-eventing-kafka-services

Then, we may verify that all KafkaBindings have been created. To do let’s just execute the following kubectl command.

The next important component is KafkaSource. We have already created three sources, a single one per application.

knative-eventing-kafka-sources

After starting, the order-saga application continuously generates and sends a new order each second. Both product-saga and customer-saga receive events and send responses. Thanks to that, the traffic is exchanged without any interruption. Except for the application pods we have three pods with Kafka sources.

Let’s just take a look at the application logs. Here are the logs from the order-saga. As you see it receives the order reservations from both customer-saga and product-saga. After that, it confirms the order and sends a response back to the order-events topic on Kafka. Basically, that’s what we wanted to achieve.

Final Thoughts

I hope you enjoyed this article. Knative is still a relatively new solution. I think we may expect some new and interesting features in the near future. With Knative Eventing you may use some other event sources than Kafka. Personally, I’m waiting for integration with RabbitMQ, which is under development now. For a full list of available solutions, you may refer to that site.

It is my third article about Knative and Spring Boot. You may expect more articles about Knative soon! Next time, I’m going to show you an example with another popular Java framework – Quarkus.

The post Knative Eventing with Kafka and Spring Cloud appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/03/12/knative-eventing-with-kafka-and-spring-cloud/feed/ 4 9569