Knative Archives - Piotr's TechBlog https://piotrminkowski.com/tag/knative/ Java, Spring, Kotlin, microservices, Kubernetes, containers Tue, 18 Apr 2023 09:02:53 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 Knative Archives - Piotr's TechBlog https://piotrminkowski.com/tag/knative/ 32 32 181738725 Serverless on OpenShift with Knative, Quarkus and Kafka https://piotrminkowski.com/2023/04/18/serverless-on-openshift-with-knative-quarkus-and-kafka/ https://piotrminkowski.com/2023/04/18/serverless-on-openshift-with-knative-quarkus-and-kafka/#comments Tue, 18 Apr 2023 09:02:49 +0000 https://piotrminkowski.com/?p=14100 In this article, you will learn how to build and run Quarkus serverless apps on OpenShift and integrate them through Knative Eventing. We will use Kafka to exchange messages between the apps. However, Knative supports various event sources. Kafka is just one of the available options. You can check out a full list of supported […]

The post Serverless on OpenShift with Knative, Quarkus and Kafka appeared first on Piotr's TechBlog.

]]>

In this article, you will learn how to build and run Quarkus serverless apps on OpenShift and integrate them through Knative Eventing. We will use Kafka to exchange messages between the apps. However, Knative supports various event sources. Kafka is just one of the available options. You can check out a full list of supported solutions in the Knative Eventing docs.

I have already published several articles about Knative on my blog. If you want a brief start read my article about Knative basics and Spring Boot. There is also a similar article to the current one more focused on Kubernetes. Today, we will focus more on the OpenShift support for the serverless features. Also, Knative is changing dynamically, so there are some significant differences in comparison to the version described in my previous articles.

Although I’m running my example apps on OpenShift, I’ll give you a recipe for how to do the same thing on vanilla Kubernetes. In order to run them on Kubernetes, you need to activate the kubernetes Maven profile instead of openshift during the build.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

How it works

During the exercise, we will deploy three Quarkus apps on OpenShift: order-service, stock-service and payment-service. All these apps are exposing a single HTTP POST endpoint for incoming events. They are also using Quarkus REST client for sending events to Kafka through the Knative Eventing. The order-service app is sending a single event that both should receive stock-service and payment-service. Then they are processing the event and send a response back to the order-service. All those things happen asynchronously by leveraging Kafka and Knative Broker. However, that process is completely transparent for the apps, which just expose the HTTP endpoint and use the HTTP client to call the endpoint exposed by the KafkaSink object.

The diagram is visible below illustrates the architecture of our solution. There are several Knative objects: KafkaSink, KafkaSource, Trigger, and Broker. The KafkaSink object eliminates the need to use Kafka client on the app side. It receives HTTP requests in CloudEvent format and converts them to the Kafka message sent to the particular topic. The KafkaSource object receives messages from Kafka and sends them to the Knative Broker. Finally, we need to define Trigger. The Trigger object filters the events inside Broker and sends them to the target app by calling its HTTP endpoint.

openshift-serverless-arch

Prerequisites

Before we proceed to the Quarkus apps, we need to install and configure two operators on OpenShift: AMQ Streams (Kafka Strimzi) and OpenShift Serverless (Knative).

openshift-serverless-operators

As a configuration phase, I define the creation of four components: Kafka, KnativeServing, KnativeEventing and KnativeKafka. We can easily create them using OpenShift Console. In all cases, we can leave the default settings. I create the Kafka instance in the kafka namespace. Just in case, here’s the YAML manifest for creating a 3-node Kafka cluster:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
  namespace: kafka
spec:
  kafka:
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: '3.3'
    storage:
      type: ephemeral
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    version: 3.3.1
    replicas: 3
  zookeeper:
    storage:
      type: ephemeral
    replicas: 3

The KnativeServing should be created in the knative-serving namespace, while KnativeEventing in the knative-eventing namespace.

kind: KnativeServing
apiVersion: operator.knative.dev/v1beta1
metadata:
  name: knative-serving
  namespace: knative-serving
spec: {}
---
kind: KnativeEventing
apiVersion: operator.knative.dev/v1beta1
metadata:
  name: knative-eventing
  namespace: knative-eventing
spec: {}

Or just “click” the create button in OpenShift Console.

Finally, the last required component – KnativeKafka. We should at least enable the sink and source to install KafkaSink and KafkaSource CRDs and controllers.

apiVersion: operator.serverless.openshift.io/v1alpha1
kind: KnativeKafka
metadata:
  name: knative-kafka
  namespace: knative-eventing
spec:
  logging:
    level: INFO
  sink:
    enabled: true
  source:
    enabled: true

Functions Support in Quarkus

Although we will implement event-driven architecture today, our Quarkus apps are just exposing and calling HTTP endpoints. In order to expose the method as the HTTP endpoint we need to include the Quarkus Funqy HTTP module. On the other hand, to call the HTTP endpoint exposed by another component we can leverage Quarkus declarative REST client. Our app is storing data in the in-memory H2 database and uses Panache ORM.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-funqy-http</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-rest-client-jackson</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-jdbc-h2</artifactId>
</dependency>

In order to expose the method as the HTTP endpoint we just need to annotate it with @Funq. Here’s the function implementation from the payment-service. It receives two types of orders: reservation and confirmation. For the reservation order reservation (status=NEW) it reserves funds in the customer account. For the confirmation type, it accepts or rollbacks the transaction depending on the order’s status. By default, the method annotated with the @Funq annotation is exposed under the same path as its name – in our case the address of the endpoint is POST /reserve.

public class OrderReserveFunction {

   private static final String SOURCE = "payment";
    
   Logger log;
   OrderReserveService orderReserveService;
   OrderConfirmService orderConfirmService;

   public OrderReserveFunction(Logger log,
                               OrderReserveService orderReserveService,
                               OrderConfirmService orderConfirmService) {
      this.log = log;
      this.orderReserveService = orderReserveService;
      this.orderConfirmService = orderConfirmService;
   }

   @Funq
   public Customer reserve(Order order) {
      log.infof("Received order: %s", order);
      if (order.getStatus() == OrderStatus.NEW) {
         return orderReserveService.doReserve(order);
      } else {
         return orderConfirmService.doConfirm(order);
      }
   }

}

Let’s take a look at the payment reservation implementation. We assume multiple incoming requests to the same concurrently, so we need to lock the entity during the transaction. Once the reservation is performed, we need to send a response back to the order-service. We are leveraging Quarkus REST client for that.

@ApplicationScoped
public class OrderReserveService {

    private static final String SOURCE = "payment";

    Logger log;
    CustomerRepository repository;
    OrderSender sender;

    public OrderReserveService(Logger log,
                               CustomerRepository repository,
                               @RestClient OrderSender sender) {
        this.log = log;
        this.repository = repository;
        this.sender = sender;
    }

    @Transactional
    public Customer doReserve(Order order) {
        Customer customer = repository.findById(order.getCustomerId(), LockModeType.PESSIMISTIC_WRITE);
        if (customer == null)
            throw new NotFoundException();
        log.infof("Customer: %s", customer);
        if (order.getAmount() < customer.getAmountAvailable()) {
            order.setStatus(OrderStatus.IN_PROGRESS);
            customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
            customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
        } else {
            order.setStatus(OrderStatus.REJECTED);
        }
        order.setSource(SOURCE);
        repository.persist(customer);
        log.infof("Order reserved: %s", order);
        sender.send(order);
        return customer;
    }
}

Here’s the implementation of our REST client. It sends a message to the endpoint exposed by the KafkaSink object. The path of the endpoint corresponds to the name of the KafkaSink object. We also need to set HTTP headers to meet the CloudEvent format. Therefore we are registering the custom ClientHeadersFactory implementation.

@ApplicationScoped
@RegisterRestClient
@RegisterClientHeaders(CloudEventHeadersFactory.class)
public interface OrderSender {

    @POST
    @Path("/payment-sink")
    void send(Order order);

}

Our custom ClientHeadersFactory implementation sets some Ce-* (CloudEvent) headers. The most important header is Ce-Type and Ce-Source since we will do filtering based on that values then.

@ApplicationScoped
public class CloudEventHeadersFactory implements ClientHeadersFactory {

    AtomicLong id = new AtomicLong();

    @Override
    public MultivaluedMap<String, String> update(MultivaluedMap<String, String> incoming,
                                                 MultivaluedMap<String, String> outgoing) {
        MultivaluedMap<String, String> result = new MultivaluedHashMap<>();
        result.add("Ce-Id", String.valueOf(id.incrementAndGet()));
        result.add("Ce-Specversion", "1.0");
        result.add("Ce-Type", "reserve-event");
        result.add("Ce-Source", "stock");
        return result;
    }

}

Finally, let’s take a look at the payment confirmation service:

@ApplicationScoped
public class OrderConfirmService {

    private static final String SOURCE = "payment";

    Logger log;
    CustomerRepository repository;

    public OrderConfirmService(Logger log, 
                               CustomerRepository repository) {
        this.log = log;
        this.repository = repository;
    }

    @Transactional
    public Customer doConfirm(Order order) {
        Customer customer = repository.findById(order.getCustomerId());
        if (customer == null)
            throw new NotFoundException();
        log.infof("Customer: %s", customer);
        if (order.getStatus() == OrderStatus.CONFIRMED) {
            customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
            repository.persist(customer);
        } else if (order.getStatus() == OrderStatus.ROLLBACK && !order.getRejectedService().equals(SOURCE)) {
            customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
            customer.setAmountAvailable(customer.getAmountAvailable() + order.getAmount());
            repository.persist(customer);
        }
        return customer;
    }
    
}

Also, let’s take a look at the implementation of the function in the order-service.

public class OrderConfirmFunction {

    private final Logger log;
    private final OrderService orderService;

    public OrderConfirmFunction(Logger log, OrderService orderService) {
        this.log = log;
        this.orderService = orderService;
    }

    @Funq
    public void confirm(Order order) {
        log.infof("Accepted order: %s", order);
        orderService.doConfirm(order);
    }

}

Here’s the function implementation for the stock-service:

public class OrderReserveFunction {

    private static final String SOURCE = "stock";

    private final OrderReserveService orderReserveService;
    private final OrderConfirmService orderConfirmService;
    private final Logger log;

    public OrderReserveFunction(OrderReserveService orderReserveService,
                                OrderConfirmService orderConfirmService,
                                Logger log) {
        this.orderReserveService = orderReserveService;
        this.orderConfirmService = orderConfirmService;
        this.log = log;
    }

    @Funq
    public void reserve(Order order) {
        log.infof("Received order: %s", order);
        if (order.getStatus() == OrderStatus.NEW) {
            orderReserveService.doReserve(order);
        } else {
            orderConfirmService.doConfirm(order);
        }
    }

}

Configure Knative Eventing

After we finished the implementation of app logic we can proceed to the configuration of OpenShift Serverless and Knative components. If you are using my GitHub repository you don’t have to manually apply any YAML manifests. All the required configuration is applied during the Maven build. It is possible thanks to the Quarkus Kubernetes extension and its support for Knative. We just need to place all the required YAML manifests inside the src/main/kubernetes/knative.yml and the magic happens by itself.

However, in order to understand what happens let’s discuss step-by-step Knative configuration. In the first step, we need to create the KafkaSink objects. KafkaSink exposes the HTTP endpoint and gets CloudEvent on input. Then it sends that event to the particular topic in the Kafka cluster. Here’s the KafkaSink definition for the payment-service:

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: payment-sink
  namespace: demo-eventing
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topic: reserve-events
  numPartitions: 1

Both payment-service and stock-service send messages on the same reserve-events topic. Therefore, we can also create a single KafkaSink per those two services (I created two sinks, each of them dedicated to the single app). On the other hand, the order-service app sends messages to the order-events topic, so we have to create a separate KafkaSink:

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: order-sink
  namespace: demo-eventing
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topic: order-events
  numPartitions: 1

After that, let’s print the list of sinks in our cluster:

Now, this URL address should be set in the application properties for the REST client configuration. Here’s the fragment of the Quarkus application.properties:

%openshift.quarkus.rest-client."pl.piomin.samples.quarkus.serverless.order.client.OrderSender".url = http://kafka-sink-ingress.knative-eventing.svc.cluster.local/demo-eventing

With KafkaSink we are able to send messages to the Kafka cluster. In order to receive them on the target apps side we need to create other objects. In the first step, we will create Knative Broker and KafkaSource object. The broker may be easily created using kn CLI:

$ kn broker create default

The KafkaSource object connects to the Kafka cluster and receives messages from the defined list of topics. In our case, these are order-events and reserve-events. The output of the KafkaSource object is the already-created default broker. It means that all the messages exchanged between our three apps are delivered to the Knative Broker.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-to-broker
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - order-events
    - reserve-events
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default

In the final step, we need to configure the mechanism responsible for getting messages from Knative Broker and sending them to the target services. In order to do that, we have to create Trigger objects. A trigger can filter messages by CloudEvent attributes. Cloud event attributes are related to the Ce-* HTTP headers from the request. For example, the payment-service app receives only messages sent by the order-service and containing the order-event type.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: payment-trigger
spec:
  broker: default
  filter:
    attributes:
      source: order
      type: order-event
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: payment-service
    uri: /reserve

The stock-trigger object is very similar. It connects to the default Broker and gets only messages with the source=order and type=order-event. Finally, it calls the POST /reserve endpoint exposed by the stock-service.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: stock-trigger
spec:
  broker: default
  filter:
    attributes:
      source: order
      type: order-event
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: stock-service
    uri: /reserve

On the other hand, the order-service app should receive events from both stock-service and payment-service. Therefore, we are filtering messages just by the type attribute. The target endpoint of the order-service is POST /confirm.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: order-trigger
spec:
  broker: default
  filter:
    attributes:
      type: reserve-event
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: order-service
    uri: /confirm

Run Quarkus Apps on Knative

We can leverage the Quarkus Kubernetes/OpenShift extension to run the app as a Knative service. In order to do that we need to include the quarkus-openshift dependency in Maven pom.xml. We would like to use that module during the build only if we need to deploy the app on the cluster. Therefore, we will create a custom Maven profile openshift. Besides including the quarkus-openshift dependency it also enables deployment by setting the quarkus.kubernetes.deploy property to true and activates the custom Quarkus profile openshift.

<profiles>
  <profile>
    <id>openshift</id>
    <activation>
      <property>
        <name>openshift</name>
      </property>
    </activation>
    <dependencies>
      <dependency>
        <groupId>io.quarkus</groupId>
        <artifactId>quarkus-openshift</artifactId>
      </dependency>
    </dependencies>
    <properties>
      <quarkus.kubernetes.deploy>true</quarkus.kubernetes.deploy>
      <quarkus.profile>openshift</quarkus.profile>
    </properties>
  </profile>
</profiles>

Once we include the quarkus-openshift module, we may use Quarkus configuration properties to customize the deployment process. Firstly, we need to set the quarkus.kubernetes.deployment-target property to knative. Thanks to that Quarkus will automatically generate the YAML manifest with Knative Service instead of a standard Kubernetes Deployment. We can also override default autoscaling settings with the quarkus.knative.revision-auto-scaling.* properties. The whole build process is running on the cluster with S2I (source-2-image), so we can use the internal OpenShift registry (the quarkus.container-image.registry property). Here’s the fragment of the application.properties file for the order-service.

quarkus.kubernetes-client.trust-certs = true
quarkus.kubernetes.deployment-target = knative
quarkus.knative.env.vars.tick-timeout = 10000
quarkus.knative.revision-auto-scaling.metric = rps
quarkus.knative.revision-auto-scaling.target = 50

%openshift.quarkus.container-image.group = demo-eventing
%openshift.quarkus.container-image.registry = image-registry.openshift-image-registry.svc:5000
%openshift.app.orders.timeout = ${TICK_TIMEOUT}

Finally, we just need to activate the openshift profile during the build and all the apps will be deployed to the target OpenShift cluster. You can deploy a single app or all the apps by running the following command in the repository root directory:

$ mvn clean package -Popenshift

Once we deploy our apps we display a list of Knative services.

openshift-serverless-knative-svc

We can also verify if all the triggers have been configured properly.

Also, let’s take a look at the “Topology” view on OpenShift which illustrates our serverless architecture.

openshift-serverless-topology

Testing Services

It is also worth creating some automated tests to verify the basic functionality before deployment. Since we have simple HTTP apps and an in-memory H2 database we can create standard tests. The only thing we need to do is to mock the HTTP client.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-junit5</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>io.rest-assured</groupId>
  <artifactId>rest-assured</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-junit5-mockito</artifactId>
  <scope>test</scope>
</dependency>

Here’s the JUnit test for the payment-service. We are verifying both the reservation and confirmation processes for the same order.

@QuarkusTest
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class OrderReserveFunctionTests {

    private static int amount;
    private CustomerRepository repository;
    @InjectMock
    @RestClient
    OrderSender sender;

    public OrderReserveFunctionTests(CustomerRepository repository) {
        this.repository = repository;
    }

    @Test
    @org.junit.jupiter.api.Order(1)
    void reserve() {
        given().contentType("application/json").body(createTestOrder(OrderStatus.NEW)).post("/reserve")
                .then()
                .statusCode(204);

        Customer c = repository.findById(1L);
        amount = c.getAmountAvailable();
        assertEquals(100, c.getAmountReserved());
    }

    @Test
    @org.junit.jupiter.api.Order(2)
    void confirm() {
        given().contentType("application/json").body(createTestOrder(OrderStatus.CONFIRMED)).post("/reserve")
                .then()
                .statusCode(204);

        Customer c = repository.findById(1L);
        assertEquals(0, c.getAmountReserved());
        assertEquals(amount, c.getAmountAvailable());
    }

    private Order createTestOrder(OrderStatus status) {
        Order o = new Order();
        o.setId(1L);
        o.setSource("test");
        o.setStatus(status);
        o.setAmount(100);
        o.setCustomerId(1L);
        return o;
    }
}

Also, let’s take a look at the logs of apps running on OpenShift. As you see, the order-service receives events from both stock-service and payment-service. After that, it confirms the order and sends a confirmation message to both services.

Here are the logs from the payment-service. As you see, it receives the CloudEvent generated by the order-service (the Ce-Source header equals to order).

Final Thoughts

With OpenShift Serverless and Knative Eventing, you can easily build event-driven architecture for simple HTTP-based apps. It is completely transparent for the app, which medium is used to store events and how they are routed. The only thing it needs to do is to prepare a request according to the CloudEvent specification. OpenShift Serverless brings several features to simplify development. We can also leverage Quarkus Kubernetes Extension to easily build and deploy our apps on OpenShift as Knative services.

The post Serverless on OpenShift with Knative, Quarkus and Kafka appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/04/18/serverless-on-openshift-with-knative-quarkus-and-kafka/feed/ 2 14100
Native Java with GraalVM and Virtual Threads on Kubernetes https://piotrminkowski.com/2023/01/04/native-java-with-graalvm-and-virtual-threads-on-kubernetes/ https://piotrminkowski.com/2023/01/04/native-java-with-graalvm-and-virtual-threads-on-kubernetes/#comments Wed, 04 Jan 2023 12:23:21 +0000 https://piotrminkowski.com/?p=13847 In this article, you will learn how to use virtual threads, build a native image with GraalVM and run such the Java app on Kubernetes. Currently, the native compilation (GraalVM) and virtual threads (Project Loom) are probably the hottest topics in the Java world. They improve the general performance of your app including memory usage […]

The post Native Java with GraalVM and Virtual Threads on Kubernetes appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use virtual threads, build a native image with GraalVM and run such the Java app on Kubernetes. Currently, the native compilation (GraalVM) and virtual threads (Project Loom) are probably the hottest topics in the Java world. They improve the general performance of your app including memory usage and startup time. Since startup time and memory usage were always a problem for Java, expectations for native images or virtual threads were really big.

Of course, we usually consider such performance issues within the context of microservices or serverless apps. They should not consume many OS resources and should be easily auto-scalable. We can easily control resource usage on Kubernetes. If you are interested in Java virtual threads you can read my previous article about using them to create an HTTP server available here. For more details about Knative as serverless on Kubernetes, you can refer to the following article.

Introduction

Let’s start with the plan for our exercise today. In the first step, we will create a simple Java web app that uses virtual threads for processing incoming HTTP requests. Before we run the sample app we will install Knative on Kubernetes to quickly test autoscaling based on HTTP traffic. We will also install Prometheus on Kubernetes. This monitoring stack allows us to compare the performance of the app without/with GraalVM and virtual threads on Kubernetes. Then, we can proceed with the deployment. In order to easily build and run our native app on Kubernetes we will use Cloud Native Buildpacks. Finally, we will perform some load tests and compare metrics.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should follow my instructions.

Create Java App with Virtual Threads

In the first step, we will create a simple Java app that acts as an HTTP server and handles incoming requests. In order to do that, we can use the HttpServer object from the core Java API. Once we create the server we can override a default thread executor with the setExecutor method. In the end, we will try to compare the app using standard threads with the same app using virtual threads. Therefore, we allow overriding the type of executor using an environment variable. The name of that is THREAD_TYPE. If you want to enable virtual threads you need to set the value virtual for that env. Here’s the main method of our app.

public class MainApp {

   public static void main(String[] args) throws IOException {
      HttpServer httpServer = HttpServer
         .create(new InetSocketAddress(8080), 0);

      httpServer.createContext("/example", 
         new SimpleCPUConsumeHandler());

      if (System.getenv("THREAD_TYPE").equals("virtual")) {
         httpServer.setExecutor(
            Executors.newVirtualThreadPerTaskExecutor());
      } else {
         httpServer.setExecutor(Executors.newFixedThreadPool(200));
      }
      httpServer.start();
   }

}

In order to process incoming requests, the HTTP server uses the handler that implements the HttpHandler interface. In our case, the handler is implemented inside the SimpleCPUConsumeHandler class as shown below. It consumes a lot of CPU since it creates an instance of BigInteger with the constructor that performs a lot of computations under the hood. It will also consume some time, so we have the simulation of processing time in the same step. As a response, we just return the next number in the sequence with the Hello_ prefix.

public class SimpleCPUConsumeHandler implements HttpHandler {

   Logger LOG = Logger.getLogger("handler");
   AtomicLong i = new AtomicLong();
   final Integer cpus = Runtime.getRuntime().availableProcessors();

   @Override
   public void handle(HttpExchange exchange) throws IOException {
      new BigInteger(1000, 3, new Random());
      String response = "Hello_" + i.incrementAndGet();
      LOG.log(Level.INFO, "(CPU->{0}) {1}", 
         new Object[] {cpus, response});
      exchange.sendResponseHeaders(200, response.length());
      OutputStream os = exchange.getResponseBody();
      os.write(response.getBytes());
      os.close();
   }
}

In order to use virtual threads in Java 19 we need to enable preview mode during compilation. With Maven we need to enable preview features using maven-compiler-plugin as shown below.

<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-compiler-plugin</artifactId>
  <version>3.10.1</version>
  <configuration>
    <release>19</release>
    <compilerArgs>
      --enable-preview
    </compilerArgs>
  </configuration>
</plugin>

Install Knative on Kubernetes

This and the next step are not required to run the native application on Kubernetes. We will use Knative to easily autoscale the app in reaction to the volume of incoming traffic. In the next section, I’ll describe how to run a monitoring stack on Kubernetes.

The simplest way to install Knative on Kubernetes is with the kubectl command. We just need the Knative Serving component without any additional features. The Knative CLI (kn) is not required. We will deploy the application from the YAML manifest using Skaffold.

First, let’s install the required custom resources with the following command:

$ kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.8.3/serving-crds.yaml

Then, we can Install the core components of Knative Serving by running the command:

$ kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.8.3/serving-core.yaml

In order to access Knative services outside of the Kubernetes cluster we also need to install a networking layer. By default, Knative uses Kourier as an ingress. We can install the Kourier controller by running the following command.

$ kubectl apply -f https://github.com/knative/net-kourier/releases/download/knative-v1.8.1/kourier.yaml

Finally, let’s configure Knative Serving to use Kourier with the following command:

kubectl patch configmap/config-network \
  --namespace knative-serving \
  --type merge \
  --patch '{"data":{"ingress-class":"kourier.ingress.networking.knative.dev"}}'

If you don’t have an external domain configured or you are running Knative on the local cluster you need to configure DNS. Otherwise, you would have to run curl commands with a host header. Knative provides a Kubernetes Job that sets sslip.io as the default DNS suffix.

$ kubectl apply -f https://github.com/knative/serving/releases/download/knative-v1.8.3/serving-default-domain.yaml

The generated URL contains the name of the service, the namespace, and the address of your Kubernetes cluster. Since I’m running my service on the local Kubernetes cluster in the demo-sless namespace my service is available under the following address:

But before we deploy the sample app on Knative, let’s do some other things.

Install Prometheus Stack on Kubernetes

As I mentioned before, we can also install a monitoring stack on Kubernetes.

The simplest way to install it is with the kube-prometheus-stack Helm chart. The package contains Prometheus and Grafana. It also includes all required rules and dashboards to visualize the basic metrics of your Kubernetes cluster. Firstly, let’s add the Helm repository containing our chart:

$ helm repo add prometheus-community https://prometheus-community.github.io/helm-charts

Then we can install the kube-prometheus-stack Helm chart in the prometheus namespace with the following command:

$ helm install prometheus-stack prometheus-community/kube-prometheus-stack  \
    -n prometheus \
    --create-namespace

If everything goes fine, you should see a similar list of Kubernetes services:

$ kubectl get svc -n prometheus
NAME                                        TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE
alertmanager-operated                       ClusterIP   None             <none>        9093/TCP,9094/TCP,9094/UDP   11s
prometheus-operated                         ClusterIP   None             <none>        9090/TCP                     10s
prometheus-stack-grafana                    ClusterIP   10.96.218.142    <none>        80/TCP                       23s
prometheus-stack-kube-prom-alertmanager     ClusterIP   10.105.10.183    <none>        9093/TCP                     23s
prometheus-stack-kube-prom-operator         ClusterIP   10.98.190.230    <none>        443/TCP                      23s
prometheus-stack-kube-prom-prometheus       ClusterIP   10.111.158.146   <none>        9090/TCP                     23s
prometheus-stack-kube-state-metrics         ClusterIP   10.100.111.196   <none>        8080/TCP                     23s
prometheus-stack-prometheus-node-exporter   ClusterIP   10.102.39.238    <none>        9100/TCP                     23s

We will analyze Grafana dashboards with memory and CPU statistics. We can enable port-forward to access it locally on the defined port, for example 9080:

$ kubectl port-forward svc/prometheus-stack-grafana 9080:80 -n prometheus

The default username for Grafana is admin and password prom-operator.

We will create two panels in the custom Grafana dashboard. First of them will show the memory usage per single pod in the demo-sless namespace.

sum(container_memory_working_set_bytes{namespace="demo-sless"} / (1024 * 1024)) by (pod)

The second of them will show the average CPU usage per single pod in the demo-sless namespace. You can import both of these directly to Grafana from the k8s/grafana-dasboards.json file from the GitHub repo.

rate(container_cpu_usage_seconds_total{namespace="demo-sless"}[3m])

Build and Deploy a native Java Application

We have already created the sample app and then configured the Kubernetes environment. Now, we may proceed to the deployment phase. Our goal here is to simplify the process of building a native image and running it on Kubernetes as much as possible. Therefore, we will use Cloud Native Buildpacks and Skaffold. With Buildpacks we don’t need to have anything installed on our laptop besides Docker. Skaffold can be easily integrated with Buildpacks to automate the whole process of building and running the app on Kubernetes. You just need to install the skaffold CLI on your machine.

For building a native image of a Java application we may use Paketo Buildpacks. It provides a dedicated buildpack for GraalVM called Paketo GraalVM Buildpack. We should include it in the configuration using the paketo-buildpacks/graalvm name. Since Skaffold supports Buildpacks, we should set all the properties inside the skaffold.yaml file. We need to override some default settings with environment variables. First of all, we have to set the version of Java to 19 and enable preview features (virtual threads). The Kubernetes deployment manifest is available under the k8s/deployment.yaml path.

apiVersion: skaffold/v2beta29
kind: Config
metadata:
  name: sample-java-concurrency
build:
  artifacts:
  - image: piomin/sample-java-concurrency
    buildpacks:
      builder: paketobuildpacks/builder:base
      buildpacks:
        - paketo-buildpacks/graalvm
        - paketo-buildpacks/java-native-image
      env:
        - BP_NATIVE_IMAGE=true
        - BP_JVM_VERSION=19
        - BP_NATIVE_IMAGE_BUILD_ARGUMENTS=--enable-preview
  local:
    push: true
deploy:
  kubectl:
    manifests:
    - k8s/deployment.yaml

Knative simplifies not only autoscaling, but also Kubernetes manifests. Here’s the manifest for our sample app available in the k8s/deployment.yaml file. We need to define a single object Service containing details of the application container. We will change the autoscaling target from the default 200 concurrent requests to 80. It means that if a single instance of the app will process more than 80 requests simultaneously Knative will create a new instance of the app (or a pod – to be more precise). In order to enable virtual threads for our app we also need to set the environment variable THREAD_TYPE to virtual.

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: sample-java-concurrency
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/target: "80"
    spec:
      containers:
        - name: sample-java-concurrency
          image: piomin/sample-java-concurrency
          ports:
            - containerPort: 8080
          env:
            - name: THREAD_TYPE
              value: virtual
            - name: JAVA_TOOL_OPTIONS
              value: --enable-preview

Assuming you already installed Skaffold, the only thing you need to do is to run the following command:

$ skaffold run -n demo-sless

Or you can just deploy a ready image from my registry on Docker Hub. However, in that case, you need to change the image tag in the deployment.yaml manifest to virtual-native.

Once you deploy the app you can verify a list of Knative Service. The name of our target service is sample-java-concurrency. The address of the service is returned in the URL field.

$ kn service list -n demo-sless

Load Testing

We will run three testing scenarios today. In the first of them, we will test a standard compilation and a standard thread pool of 100 size. In the second of them, we will test a standard compilation with virtual threads. The final test will check native compilation in conjunction with virtual threads. In all these scenarios, we will set the same autoscaling target – 80 concurrent requests. I’m using the k6 tool for load tests. Each test scenario consists of 4 same steps. Each step takes 2 minutes. In the first step, we are simulating 50 users.

$ k6 run -u 50 -d 120s k6-test.js

Then, we are simulating 100 users.

$ k6 run -u 100 -d 120s k6-test.js

Finally, we run the test for 200 users twice. So, in total, there are four tests with 50, 100, 200, and 200 users, which takes 8 minutes.

$ k6 run -u 200 -d 120s k6-test.js

Let’s verify the results. By the way, here is our test for the k6 tool in javascript.

import http from 'k6/http';
import { check } from 'k6';

export default function () {
  const res = http.get(`http://sample-java-concurrency.demo-sless.127.0.0.1.sslip.io/example`);
  check(res, {
    'is status 200': (res) => res.status === 200,
    'body size is > 0': (r) => r.body.length > 0,
  });
}

Test for Standard Compilation and Threads

The diagram visible below shows memory usage at each phase of the test scenario. After simulating 200 users Knative scales up the number of instances. Theoretically, it should do that during 100 users test. But Knative measures incoming traffic at the level of the sidecar container inside the pod. The memory usage for the first instance is around ~900MB (it includes also sidecar container usage).

graalvm-virtual-threads-kubernetes-memory

Here’s a similar view as before but for the CPU usage. The highest consumption was before autoscaling occurs at the level of ~1.2 core. Then, depending on the number of instances ranges from ~0.4 core to ~0.7 core. As I mentioned before, we are using a time-consuming BigInteger constructor to simulate CPU usage under a heavy load.

graalvm-virtual-threads-kubernetes-cpu

Here are the test results for 50 users. The application was able to process ~105k requests in 2 minutes. The highest processing time value was ~3 seconds.

graalvm-virtual-threads-kubernetes-load-test

Here are the test results for 100 users. The application was able to process ~130k requests in 2 minutes with an average response time of ~90ms.

graalvm-virtual-threads-kubernetes-heavy-load

Finally, we have results for 200 users test. The application was able to process ~135k requests in 2 minutes with an average response time of ~175ms. The failure threshold was at the level of 0.02%.

Test for Standard Compilation and Virtual Threads

The same as in the previous section, here’s the diagram that shows memory usage at each phase of the test scenario. After simulating 100 users Knative scales up the number of instances. Theoretically, it should run the third instance of the app for 200 users. The memory usage for the first instance is around ~850MB (it includes also sidecar container usage).

graalvm-virtual-threads-kubernetes-memory-2

Here’s a similar view as before but for the CPU usage. The highest consumption was before autoscaling occurs at ~1.1 core. Then, depending on the number of instances ranges from ~0.3 core to ~0.7 core.

Here are the test results for 50 users. The application was able to process ~105k requests in 2 minutes. The highest processing time value was ~2.2 seconds.

Here are the test results for 100 users. The application was able to process ~115k requests in 2 minutes with an average response time of ~100ms.

Finally, we have results for 200 users test. The application was able to process ~135k requests in 2 minutes with an average response time of ~180ms. The failure threshold was at the level of 0.02%.

Test for Native Compilation and Virtual Threads

The same as in the previous section, here’s the diagram that shows memory usage at each phase of the test scenario. After simulating 100 users Knative scales up the number of instances. Theoretically, it should run the third instance of the app for 200 users (the third pod visible on the diagram was in fact in the Terminating phase for some time). The memory usage for the first instance is around ~50MB.

graalvm-virtual-threads-kubernetes-native-memory

Here’s a similar view as before but for the CPU usage. The highest consumption was before autoscaling occurs at ~1.3 core. Then, depending on the number of instances ranges from ~0.3 core to ~0.9 core.

Here are the test results for 50 users. The application was able to process ~75k requests in 2 minutes. The highest processing time value was ~2 seconds.

Here are the test results for 100 users. The application was able to process ~85k requests in 2 minutes with an average response time of ~140ms

Finally, we have results for 200 users test. The application was able to process ~100k requests in 2 minutes with an average response time of ~240ms. Plus – there were no failures at the second 200 users attempt.

Summary

In this article, I tried to compare the behavior of the Java app for GraalVM native compilation with virtual threads on Kubernetes with a standard approach. There are several conclusions after running all described tests:

  • There are no significant differences between standard and virtual threads when comes to resource usage or request processing time. The resource usage is slightly lower for virtual threads. On the other hand, the processing time is slightly lower for standard threads. However, if our handler method would take more time, this proportion changes in favor of virtual threads.
  • Autoscaling works quite better for virtual threads. However, I’m not sure why 🙂 Anyway, the number of instances was scaled up for 100 users with a target at the level of 80 for virtual threads, while for standard thread no. Of course, virtual threads give us more flexibility when setting an autoscaling target. For standard threads, we have to choose a value lower than a thread pool size, while for virtual threads we can set any reasonable value.
  • Native compilation significantly reduces app memory usage. For the native app, it was ~50MB instead of ~900MB. On the other hand, the CPU consumption was slightly higher for the native app.
  • Native app process requests slower than a standard app. In all the tests it was 30% lower than the number of requests processed by a standard app.

The post Native Java with GraalVM and Virtual Threads on Kubernetes appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/01/04/native-java-with-graalvm-and-virtual-threads-on-kubernetes/feed/ 7 13847
Canary Release on Kubernetes with Knative and Tekton https://piotrminkowski.com/2022/03/29/canary-release-on-kubernetes-with-knative-and-tekton/ https://piotrminkowski.com/2022/03/29/canary-release-on-kubernetes-with-knative-and-tekton/#respond Tue, 29 Mar 2022 07:43:43 +0000 https://piotrminkowski.com/?p=10932 In this article, you will learn how to prepare a canary release in your CI/CD with Knative and Tekton. Since Knative supports many versions of the same service it seems to be the right tool to do canary releases. We will use its feature called gradual rollouts to shift the traffic to the latest version […]

The post Canary Release on Kubernetes with Knative and Tekton appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to prepare a canary release in your CI/CD with Knative and Tekton. Since Knative supports many versions of the same service it seems to be the right tool to do canary releases. We will use its feature called gradual rollouts to shift the traffic to the latest version in progressive steps. As an exercise, we are going to compile natively (with GraalVM) and run a simple REST service built on top of Spring Boot. We will use Cloud Native Buildpacks as a build tool on Kubernetes. Let’s begin!

If you are interested in more details about Spring Boot native compilation please refer to my article Microservices on Knative with Spring Boot and GraalVM.

Prerequisites

Native compilation for Java is a memory-intensive process. Therefore, we need to reserve at least 8GB for our Kubernetes cluster. We also have to install Tekton and Knative there, so it is worth having even more memory.

1. Install Knative Serving – we will use the latest version of Knative (1.3). Go to the following site for the installation manual. Once you did that, you can just verify if it works with the following command:

$ kubectl get pods -n knative-serving

2. Install Tekton – you can go to that site for more details. However, there is just a single command to install it:

$ kubectl apply --filename https://storage.googleapis.com/tekton-releases/pipeline/latest/release.yaml

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then go to the callme-service directory. After that, you should just follow my instructions.

Spring Boot for Native GraalVM

In order to expose the REST endpoints, we need to include Spring Boot Starter Web. Our service also stored data in the H2 database, so we include Spring Boot Starter Data JPA. The last dependency is for native compilation support. The current version of Spring Native is 0.11.3.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.experimental</groupId>
  <artifactId>spring-native</artifactId>
  <version>0.11.3</version>
</dependency>

Let’s switch to the code. It is our model class exposed by the REST endpoint. It contains the current date, the name of the Kubernetes pod, and the version number.

@Entity
public class Callme {

   @Id
   @GeneratedValue
   private Integer id;
   @Temporal(TemporalType.TIMESTAMP)
   private Date addDate;
   private String podName;
   private String version;

   // getters, setters, constructor ...

}

There is a single endpoint that creates an event, stores it in the database and returns it as a result. The name of the pod and the name of the namespace are taken directly from Kubernetes Deployment. We use the version number from Maven pom.xml as the application version.

@RestController
@RequestMapping("/callme")
public class CallmeController {

   @Value("${spring.application.name}")
   private String appName;
   @Value("${POD_NAME}")
   private String podName;
   @Value("${POD_NAMESPACE}")
   private String podNamespace;
   @Autowired
   private CallmeRepository repository;
   @Autowired(required = false)
   BuildProperties buildProperties;

   @GetMapping("/ping")
   public String ping() {
      Callme c = repository.save(new Callme(new Date(), podName,
            buildProperties != null ? buildProperties.getVersion() : null));
      return appName +
            " v" + c.getVersion() +
            " (id=" + c.getId() + "): " +
            podName +
            " in " + podNamespace;
   }

}

In order to use the Maven version, we need to generate the build-info.properties file during the build. Therefore we should add the build-info goal in the spring-boot-maven-plugin execution properties. If you would like to build a native image locally just set the configuration environment property BP_NATIVE_IMAGE to true. Then you can just run the command mvn spring-boot:build-image.

<plugin>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-maven-plugin</artifactId>
  <executions>
    <execution>
      <goals>
        <goal>build-info</goal>
      </goals>
    </execution>
  </executions>
  <configuration>
    <image>
      <builder>paketobuildpacks/builder:tiny</builder>
      <env>
        <BP_NATIVE_IMAGE>true</BP_NATIVE_IMAGE>
      </env>
    </image>
  </configuration>
</plugin>

Create Tekton Pipelines

Our pipeline consists of three tasks. In the first of them, we are cloning the source code repository with the Spring Boot application. In the second step, we are building the image natively on Kubernetes using the Cloud Native Builpacks task. After building the image we are pushing to the remote registry. Finally, we are running the image on Kubernetes as the Knative service.

knative-canary-release-pipeline

Firstly, we need to install the Tekton git-clone task:

$ kubectl apply -f https://raw.githubusercontent.com/tektoncd/catalog/master/task/git-clone/0.4/git-clone.yaml

We also need the buildpacks task that allows us to run Cloud Native Buildpacks on Kubernetes:

$ kubectl apply -f https://raw.githubusercontent.com/tektoncd/catalog/master/task/buildpacks/0.3/buildpacks.yaml

Finally, we are installing the kubernetes-actions task to deploy Knative Service using the YAML manifest.

$ kubectl apply -f https://raw.githubusercontent.com/tektoncd/catalog/main/task/kubernetes-actions/0.2/kubernetes-actions.yaml

All our tasks are ready. Let’s just verify it:

$ kubectl get task                     
NAME                 AGE
buildpacks           1m
git-clone            1m
kubernetes-actions   33s

Finally, we are going to create a Tekton pipeline. In the buildpacks task reference, we need to set several parameters. Since we have two Maven modules in the repository, we first need to set the working directory to the callme-service (1). Also, we use Paketo Buildpacks, so we change the default builder image to the paketobuildpacks/builder:base (2). Finally, we need to enable native build for Cloud Native Buildpacks. In order to do that, we should set the environment variable BP_NATIVE_IMAGE to true (3). After building and pushing the image we can deploy it on Kubernetes (4).

apiVersion: tekton.dev/v1alpha1
kind: Pipeline
metadata:
  name: build-spring-boot-pipeline
spec:
  params:
    - description: image URL to push
      name: image
      type: string
  tasks:
    - name: fetch-repository
      params:
        - name: url
          value: 'https://github.com/piomin/sample-spring-boot-graalvm.git'
        - name: subdirectory
          value: ''
        - name: deleteExisting
          value: 'true'
      taskRef:
        kind: Task
        name: git-clone
      workspaces:
        - name: output
          workspace: source-workspace
    - name: buildpacks
      params:
        - name: APP_IMAGE
          value: $(params.image)
        - name: SOURCE_SUBPATH # (1)
          value: callme-service
        - name: BUILDER_IMAGE # (2)
          value: 'paketobuildpacks/builder:base'
        - name: ENV_VARS # (3)
          value:
            - BP_NATIVE_IMAGE=true
      runAfter:
        - fetch-repository
      taskRef:
        kind: Task
        name: buildpacks
      workspaces:
        - name: source
          workspace: source-workspace
        - name: cache
          workspace: cache-workspace
    - name: deploy
      params:
        - name: args # (4)
          value: 
            - apply -f callme-service/k8s/
      runAfter:
        - buildpacks
      taskRef:
        kind: Task
        name: kubernetes-actions
      workspaces:
        - name: manifest-dir
          workspace: source-workspace
  workspaces:
    - name: source-workspace
    - name: cache-workspace

In order to push the image into the remote secure registry, you need to create a Secret containing your username and password.

$ kubectl create secret docker-registry docker-user-pass \
    --docker-username=<USERNAME> \
    --docker-password=<PASSWORD> \
    --docker-server=https://index.docker.io/v1/

After that, you should create ServiceAccount that uses a newly created Secret. As you probably figured out our pipeline uses that ServiceAccount.

apiVersion: v1
kind: ServiceAccount
metadata:
  name: buildpacks-service-account
secrets:
  - name: docker-user-pass

Deploy Knative Service with gradual rollouts

Here’s the YAML manifest with our Knative Service for the 1.0 version. It is a very simple definition. The only additional thing we need to do is to inject the name of the pod and namespace into the container.

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: callme-service
spec:
  template:
    spec:
      containers:
      - name: callme
        image: piomin/callme-service:1.0
        ports:
          - containerPort: 8080
        env:
          - name: POD_NAME
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
          - name: POD_NAMESPACE
            valueFrom:
              fieldRef:
                fieldPath: metadata.namespace

In order to inject the name of the pod and namespace into the container, we use Downward API. This Kubernetes feature is by default disabled on Knative. To enable it we need to add the property kubernetes.podspec-fieldref with value enabled in the config-features ConfigMap.

apiVersion: v1
kind: ConfigMap
metadata:
  name: config-features
  namespace: knative-serving
data:
  kubernetes.podspec-fieldref: enabled

Now, let’s run our pipeline for the 1.0 version of our sample application. To run it, you should also create a PersistentVolumeClaim with the name tekton-workspace-pvc.

apiVersion: tekton.dev/v1alpha1
kind: PipelineRun
spec:
  params:
    - name: image
      value: 'piomin/callme-service:1.0'
  pipelineRef:
    name: build-spring-boot-pipeline
  serviceAccountName: buildpacks-service-account
  workspaces:
    - name: source-workspace
      persistentVolumeClaim:
        claimName: tekton-workspace-pvc
      subPath: source
    - name: cache-workspace
      persistentVolumeClaim:
        claimName: tekton-workspace-pvc
      subPath: cache

Finally, it is time to release a new version of our application – 1.1. Firstly, you should change the version number in Maven pom.xml.

We should also change the version number in the k8s/ksvc.yaml manifest. However, the most important thing is related to the annotation serving.knative.dev/rollout-duration. It enables gradual rollouts for Knative Service. The value 300s of this parameter means that our rollout to the latest revision will take exactly 300 seconds. Knative is going to roll out to 1% of traffic first, and then in equal incremental steps for the rest of the assigned traffic. In that case, it will increase the traffic to the latest service by 1% every 3 seconds.

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: callme-service
  annotations:
    serving.knative.dev/rollout-duration: "300s"
spec:
  template:
    spec:
      containers:
      - name: callme
        image: piomin/callme-service:1.1
        ports:
          - containerPort: 8080
        env:
          - name: POD_NAME
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
          - name: POD_NAMESPACE
            valueFrom:
              fieldRef:
                fieldPath: metadata.namespace

Verify Canary Release with Knative

Let’s verify our canary release process built with Knative and Tekton. Once you run the pipeline for the 1.0 and 1.1 version of our sample application, you can display a list of Knative Service. As you the latest revision is callme-service-00002, but the rollout is still in progress:

knative-canary-release-services

We can send some test requests to our Knative Service. For me, Knative is available on localhost:80, since I’m using Kubernetes on the Docker Desktop. The only thing I need to do is to set the name URL of the service in the Host header.

$ curl http://localhost:80/callme/ping \
  -H "Host:callme-service.default.example.com"

Here are some responses. As you see, the first two requests have been processed by the 1.0 version of our application. While the last request by the 1.1 version.

Let’s verify the current percentage traffic distribution between the two revisions. Currently, it is 52% to 1.1 and 48% to 1.0.

knative-canary-release-rollout

Finally, after the rollout procedure is finished you should get the same response as me.

Final Thoughts

As you see the process of canary release with Knative is very simple. You only need to set a single annotation on the Knative Service. You can compare it for example with Argo Rollouts which allows us to perform progressive traffic shifting for a standard Kubernetes Deployment.

The post Canary Release on Kubernetes with Knative and Tekton appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/03/29/canary-release-on-kubernetes-with-knative-and-tekton/feed/ 0 10932
Serverless Java Functions on OpenShift https://piotrminkowski.com/2021/11/30/serverless-java-functions-on-openshift/ https://piotrminkowski.com/2021/11/30/serverless-java-functions-on-openshift/#respond Tue, 30 Nov 2021 13:52:46 +0000 https://piotrminkowski.com/?p=10262 In this article, you will learn how to create and deploy serverless, Knative based functions on OpenShift. We will use a single kn CLI command to build and run our applications on the cluster. How we can do that? With OpenShift Serverless Functions we may use the kn func plugin that allows us to work […]

The post Serverless Java Functions on OpenShift appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to create and deploy serverless, Knative based functions on OpenShift. We will use a single kn CLI command to build and run our applications on the cluster. How we can do that? With OpenShift Serverless Functions we may use the kn func plugin that allows us to work directly with the source code. It uses Cloud Native Buildpacks API to create container images. It supports several runtimes like Node.js, Python, or Golang. However, we will try Java runtimes based on the Quarkus or Spring Boot frameworks.

Prerequisites

You need to have two things to be able to run this exercise by yourself. Firstly, you need to run Docker or Podman on your local machine, because CNCF Buildpacks use it for running build. If you are not familiar with Cloud Native Buildpacks you can read my article about it. I tried to configure Podman according to this part of the documentation, but I was not succeded (on macOS). With Docker, it just works, so I avoided other tries with Podman.

On the other hand, you need to have a target OpenShift cluster with the serverless module installed. You can run it locally using Code Ready Containers (crc). But in my opinion, a better idea is to try the developer sandbox available online here. It contains all you need to start development – including OpenShift Serverless. It is by default available on the sandbox version of Openshift.

Finally, you need to install the oc client and kn CLI locally. Since we use the kn func plugin, we need to install the Knative CLI version provided by RedHat. The detailed installation instruction is available here.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then go to the serverless/functions directory. After that, you should just follow my instructions. Let’s begin.

Create OpenShift Serverless function with Quarkus

We can generate a sample application source code using a single kn command. We may choose between multiple runtimes and two templates. Currently, there are five runtimes available. By default it is node (for Node.js applications), but you can also set quarkus, springboot, typescript, go or python. Also, there are two templates available: http for simple REST-based applications and events for applications leveraging the Knative Eventing approach in communication. Let’s create our first application using the quarkus runtime and events template.

$ kn func create -l quarkus -t events caller-function

Now, go to the order-function directory and edit generated pom.xml file. Firstly, we will replace a version of Java into 11, and a version of Quarkus with the latest.

<properties>
  <compiler-plugin.version>3.8.1</compiler-plugin.version>
  <maven.compiler.parameters>true</maven.compiler.parameters>
  <maven.compiler.source>11</maven.compiler.source>
  <maven.compiler.target>11</maven.compiler.target>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  <quarkus-plugin.version>2.4.2.Final</quarkus-plugin.version>
  <quarkus.platform.artifact-id>quarkus-universe-bom</quarkus.platform.artifact-id>
  <quarkus.platform.group-id>io.quarkus</quarkus.platform.group-id>
  <quarkus.platform.version>2.4.2.Final</quarkus.platform.version>
  <surefire-plugin.version>3.0.0-M5</surefire-plugin.version>
</properties>

By default the kn func plugin includes some dependencies in the test scope and a single dependency with the Quarkus Funqy Knative extension. There is also Quarkus Smallrye Health to automatically generate liveness and readiness health checks.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-funqy-knative-events</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-smallrye-health</artifactId>
</dependency>

By default, the kn func generates a simple function that takes CloudEvent as an input and sends the same event as an output. I will not change there much and just replace System.out with the Logger implementation in order to print logs.

public class Function {

   @Inject
   Logger logger;

   @Funq
   public CloudEvent<Output> function(CloudEvent<Input> input) {
      logger.infof("New event: %s", input);
      Output output = new Output(input.data().getMessage());
      return CloudEventBuilder.create().build(output);
   }

}

Assuming you have already logged in to your OpenShift cluster using the oc client you can proceed to the function deployment. In fact, you just need to go to your application directory and then run a single kn func command as shown below.

$ kn func deploy -i quay.io/pminkows/caller-function -v

Once you run the command visible above the local is starting on Docker. If it finishes successfully, we are proceeding to the deployment phase.

openshift-serverless-functions-build-and-deploy

In the application root directory, there is also an automatically generated configuration file func.yaml.

name: caller-function
namespace: ""
runtime: quarkus
image: quay.io/pminkows/caller-function
imageDigest: sha256:5d3ef16e1282bc5f6367dff96ab7bb15487199ac3939e262f116657a83706245
builder: quay.io/boson/faas-jvm-builder:v0.8.4
builders: {}
buildpacks: []
healthEndpoints: {}
volumes: []
envs: []
annotations: {}
options: {}
labels: []

Create OpenShift Serveless functions with Spring Boot

Now, we will do exactly the same thing as before, but this time for the Spring Boot application. In order to create a Spring Boot function we just need to set springboot as the runtime name. The name of our application is callme-function.

$ kn func create -l springboot -t events callme-function

Then we go to the callme-function directory. Firstly, let’s edit Maven pom.xml. The same as for Quarkus I’ll set the latest version of Spring Boot.

<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.6.1</version>
  <relativePath />
</parent>

The generated application is built on top of the Spring Cloud Function project. We don’t need to add there anything.

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-function-web</artifactId>
</dependency>

Spring Boot function code is a little bit more complicated than generated by the Quarkus framework. It uses Spring functional programming style, where a Function bean represents HTTP POST endpoint with input and output response.

@SpringBootApplication
public class SpringCloudEventsApplication {

  private static final Logger LOGGER = Logger.getLogger(
      SpringCloudEventsApplication.class.getName());

  public static void main(String[] args) {
    SpringApplication.run(SpringCloudEventsApplication.class, args);
  }

  @Bean
  public Function<Message<Input>, Output> uppercase(CloudEventHeaderEnricher enricher) {
    return m -> {
      HttpHeaders httpHeaders = HeaderUtils.fromMessage(m.getHeaders());
      
      LOGGER.log(Level.INFO, "Input CE Id:{0}", httpHeaders.getFirst(
          ID));
      LOGGER.log(Level.INFO, "Input CE Spec Version:{0}",
          httpHeaders.getFirst(SPECVERSION));
      LOGGER.log(Level.INFO, "Input CE Source:{0}",
          httpHeaders.getFirst(SOURCE));
      LOGGER.log(Level.INFO, "Input CE Subject:{0}",
          httpHeaders.getFirst(SUBJECT));

      Input input = m.getPayload();
      LOGGER.log(Level.INFO, "Input {0} ", input);
      Output output = new Output();
      output.input = input.input;
      output.operation = httpHeaders.getFirst(SUBJECT);
      output.output = input.input != null ? input.input.toUpperCase() : "NO DATA";
      return output;
    };
  }

  @Bean
  public CloudEventHeaderEnricher attributesProvider() {
    return attributes -> attributes
        .setSpecVersion("1.0")
        .setId(UUID.randomUUID()
            .toString())
        .setSource("http://example.com/uppercase")
        .setType("com.redhat.faas.springboot.events");
  }

  @Bean
  public Function<String, String> health() {
    return probe -> {
      if ("readiness".equals(probe)) {
        return "ready";
      } else if ("liveness".equals(probe)) {
        return "live";
      } else {
        return "OK";
      }
    };
  }
}

Because there are two functions (@Bean Function) defined in the generated code, you need to add the following property in the application.properties file.

spring.cloud.function.definition = uppercase;health

Deploying serverless functions on OpenShift

We have two sample applications deployed on the OpenShift cluster. The first of them is written in Quarkus, while the second of them in Spring Boot. Those applications will communicate with each other through events. So in the first step, we need to create a Knative Eventing broker.

$ kn broker create default

Let’s check if the broker has been successfully created.

$ kn broker list
NAME      URL                                                                                  AGE   CONDITIONS   READY   REASON
default   http://broker-ingress.knative-eventing.svc.cluster.local/piomin-serverless/default   12m   5 OK / 5     True

Then, let’s display a list of running Knative services:

$ kn service list
NAME              URL                                                                                       LATEST                  AGE   CONDITIONS   READY   REASON
caller-function   https://caller-function-piomin-serverless.apps.cluster-8e1d.8e1d.sandbox114.opentlc.com   caller-function-00002   18m   3 OK / 3     True    
callme-function   https://callme-function-piomin-serverless.apps.cluster-8e1d.8e1d.sandbox114.opentlc.com   callme-function-00006   11h   3 OK / 3     True 

Send and receive CloudEvent with Quarkus

The architecture of our solution is visible in the picture below. The caller-function receives events sent directly by us using the kn func plugin. Then it processes the input event, creates a new CloudEvent and sends it to the Knative Broker. The broker just receives events. To provide more advanced event routing we need to define the Knative Trigger. The trigger is able to filter events and then send them directly to the target sink. On the other hand, those events are received by the callme-function.

openshift-serverless-functions-arch

Ok, so now we need to rewrite caller-function to add a step of creating and sending CloudEvent to the Knative Broker. To do we need to declare and invoke the Quarkus REST client. Firstly, let’s add the required dependencies.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-rest-client</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-rest-client-jackson</artifactId>
</dependency>

In the next step, we will create a client interface with a declaration of a calling method. The CloudEvent specification requires four HTTP headers to be set on the POST request. The context path of a Knative Broker is /piomin-serverless/default, and we have already checked it out using the kn broker list command.

@Path("/piomin-serverless/default")
@RegisterRestClient
public interface BrokerClient {

   @POST
   @Produces(MediaType.APPLICATION_JSON)
   String sendEvent(Output event,
                    @HeaderParam("Ce-Id") String id,
                    @HeaderParam("Ce-Source") String source,
                    @HeaderParam("Ce-Type") String type,
                    @HeaderParam("Ce-Specversion") String version);
}

We also need to set the broker address in the application.properties. We use a standard property mp-rest/url handled by the MicroProfile REST client.

functions.BrokerClient/mp-rest/url = http://broker-ingress.knative-eventing.svc.cluster.local 

Here’s the final implementation of our function in the caller-function module. The type of event is caller.output. In fact, we can set any name as an event type.

public class Function {

   @Inject
   Logger logger;
   @Inject
   @RestClient
   BrokerClient client;

   @Funq
   public CloudEvent<Output> function(CloudEvent<Input> input) {
      logger.infof("New event: %s", input);
      Output output = new Output(input.data().getMessage());
      CloudEvent<Output> outputCloudEvent = CloudEventBuilder.create().build(output);
      client.sendEvent(output,
                input.id(),
                "http://caller-function",
                "caller.output",
                input.specVersion());
      return outputCloudEvent;
    }

}

Finally, we will create a Knative Trigger. It gets events incoming to the broker and filters out by type. Only events with the type caller.output are forwarded to the callme-function.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: callme-trigger
spec:
  broker: default
  filter:
    attributes:
      type: caller.output
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: callme-function
    uri: /uppercase

Now, we can send a test CloudEvent to the caller-function directly from the local machine with the following command (ensure you are calling it in the caller-function directory):

$ kn func emit -d "Hello"

The Output class in the caller-function and Input class in the callme-function should have the same fields. By default kn func generates different fields for Quarkus and Spring Boot example applications. So you also need to refactor one of these objects. I changed the field in the callme-function Input class.

Summary

Let’s summarize our exercise. We built and deployed two applications (Quarkus and Spring Boot) directly from the source code into the target cluster. Thanks to the OpenShift Serverless Functions we didn’t have to provide any additional configuration to do that to deploy them as Knative services.

If there is no incoming traffic, Knative services are automatically scaled down to zero after 60 seconds (by default).

So, to generate some traffic we may use the kn func emit command. It sends a CloudEvent message directly to the target application. In our case, it is caller-function (Quarkus). After receiving an input event the pod with the caller-function starts. After startup, it sends a CloudEvent message to the Knative Broker. Finally, the event goes to the callme-service (Spring Boot), which is also starting as shown below.

openshift-serverless-functions-pods

As you see OpenShift provides several simplifications when working with Knative. And what’s important, now you can easily test them by yourself using the developer sandbox version of OpenShift available online.

The post Serverless Java Functions on OpenShift appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/11/30/serverless-java-functions-on-openshift/feed/ 0 10262
Knative Eventing with Quarkus, Kafka and Camel https://piotrminkowski.com/2021/06/14/knative-eventing-with-quarkus-kafka-and-camel/ https://piotrminkowski.com/2021/06/14/knative-eventing-with-quarkus-kafka-and-camel/#comments Mon, 14 Jun 2021 07:47:54 +0000 https://piotrminkowski.com/?p=9797 In this article, you will learn how to use Quarkus with Camel to create applications that send messages to Kafka and receive CloudEvent from Knative Eventing. We will build a very similar system to the system described in my previous article Knative Eventing with Kafka and Quarkus. However, this time we will use Apache Camel […]

The post Knative Eventing with Quarkus, Kafka and Camel appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Quarkus with Camel to create applications that send messages to Kafka and receive CloudEvent from Knative Eventing. We will build a very similar system to the system described in my previous article Knative Eventing with Kafka and Quarkus. However, this time we will use Apache Camel instead of several Quarkus extensions including Kafka support.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

First, you should go to the saga directory. It contains two applications built on top of Quarkus and Apache Camel. Today we will implement an eventual consistency pattern (also known as a SAGA pattern). It will use the Knative Eventing model for exchanging events between our applications.

1. Prerequisites

Before we start, we need to configure some components like Kafka, Knative or Kafka Eventing Broker. Let’s go further these steps.

1.1. Install Apache Kafka cluster

Firstly, let’s create our kafka namespace.

$ kubectl create namespace kafka

Then we apply the installation files, including ClusterRolesClusterRoleBindings and other required Kubernetes CustomResourceDefinitions (CRD).

$ kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

After that, we can create a single-node persistent Apache Kafka cluster. We use an example custom resource for the Strimzi operator.

$ kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka

Finally, we may verify our installation. You should the same result as shown below.

1.2. Install Knative Serving and Eventing

You can install Knative on your Kubernetes cluster using YAML manifests or operator. The current version of Knative is 0.23. This is minimal list of steps you need to do with YAML-based installation is visible below. For more details, you may refer to the documentation. I place or the required commands to simplify a process for you. Let’s install Knative Serving.

$ kubectl apply -f https://github.com/knative/serving/releases/download/v0.23.0/serving-crds.yaml
$ kubectl apply -f https://github.com/knative/serving/releases/download/v0.23.0/serving-core.yaml
$ kubectl apply -f https://github.com/knative/net-kourier/releases/download/v0.23.0/kourier.yaml
$ kubectl patch configmap/config-network \
  --namespace knative-serving \
  --type merge \
  --patch '{"data":{"ingress.class":"kourier.ingress.networking.knative.dev"}}'

Then let’s install Knative Eventing.

$ kubectl apply -f https://github.com/knative/eventing/releases/download/v0.23.0/eventing-crds.yaml
$ kubectl apply -f https://github.com/knative/eventing/releases/download/v0.23.0/eventing-core.yaml

1.3. Install Knative Kafka Eventing

The following commands install the Apache Kafka broker, and run event routing in a system namespace, knative-eventing, by default.

$ kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-controller.yaml
$ kubectl apply -f https://github.com/knative-sandbox/eventing-kafka-broker/releases/download/v0.23.0/eventing-kafka-broker.yaml

Then, we should install CRD with KafkaBinding and KafkaSource.

$ kubectl apply -f https://storage.googleapis.com/knative-releases/eventing-contrib/latest/kafka-source.yaml

Finally, let’s just create a broker.

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: default
  namespace: default
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing

1.4. Install Apache Camel K operator (optional)

If you would like to use Apache Camel K to run the Quarkus application on Knative you first install its operator. After downloading Camel K CLI you just need to run the following command.

$ kamel install

2. Run the application with Apache Camel K

In order to deploy and run an application on Knative, we may execute the kamel run command. Some parameters might be set in the source file. In the command visible below, I’m setting the name Knative service, source file location, and Quarkus properties.

$ kamel run --name order-saga --dev OrderRoute.java \
    -p quarkus.datasource.db-kind=h2 \
    -p quarkus.datasource.jdbc.url=jdbc:h2:mem:testdb \
    -p quarkus.hibernate-orm.packages=com.github.piomin.entity.model.order

For more details about deploying Quarkus with Camel K on Kubernetes you may refer to my article Apache Camel K and Quarkus on Kubernetes.

Finally, I was not able to deploy exactly this application on Knative with Camel K. That’s because it didn’t see JPA entities included in the application with the external library. However, the application is also prepared for deploying with Camel K. The whole source code is a single Java file and there are some Camel K modeline hooks in this source code.

3. Integrate Quarkus with Apache Camel

We can easily integrate Apache Camel routes with Quarkus. Camel Quarkus provides extensions for many of the Camel components. We need to include those components in our Maven pom.xml. What type of components do we need? The full list is visible below. However, first, let’s discuss them a little bit more.

Our application uses JPA to store entities in the H2 database. So we will include the Camel Quarkus JPA extension to provide the JPA implementation with Hibernate. For a single persistence unit, this extension automatically creates EntityManagerFactory and TransactionManager. In order to integrate with Apache Kafka, we need to include the Camel Quarkus Kafka extension. In order to receive events from Knative, we should expose the HTTP POST endpoint. That’s why we need several extensions like Platform HTTP or Jackson. Here’s my list of Maven dependencies.

<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-core</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-platform-http</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-bean</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-timer</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-jpa</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-rest</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.quarkus</groupId>
  <artifactId>camel-quarkus-jackson</artifactId>
</dependency>

Then, we just need to create a class that extends RouteBuilder. The routes have to be defined inside the configure() method. Before we get into the details let’s analyze our domain model classes.

public class CustomerRoute extends RouteBuilder {
   @Override
   public void configure() throws Exception { 
      ...
   }
}

4. Domain model for Quarkus JPA and Kafka

I created a separate project for entity model classes. The repository is available on GitHub https://github.com/piomin/entity-model.git. Thanks to that, I have a typical serverless application with consists of a single class. It is also possible to easily deploy it on Knative with Camel K. Here’s a model entity for order-saga.

@Entity
@Table(name = "orders")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Order implements Serializable {
    @Id
    @GeneratedValue
    private Long id;
    private Integer customerId;
    private Integer productId;
    private int amount;
    private int productCount;
    @Enumerated
    private OrderStatus status = OrderStatus.NEW;
}

Just to simplify, I’m using the same class when sending events to Kafka. We can also take a look at a model entity for customer-saga.

@Entity
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Customer implements Serializable {
    @Id
    @GeneratedValue
    private Long id;
    private String name;
    private int amountAvailable;
    private int amountReserved;
}

5. Building Camel routes with Quarkus and Kafka extension

In the first step, we are going to generate orders and send them to the Kafka topic. Before that, we will store them in the H2 database using the Camel JPA extension. This part of logic is implemented inside order-saga.

from("timer:tick?period=10000")
   .setBody(exchange -> 
      new Order(null, r.nextInt(10) + 1, r.nextInt(10) + 1, 100, 1, OrderStatus.NEW))
   .to("jpa:" + Order.class.getName())
   .marshal().json(JsonLibrary.Jackson)
   .log("New Order: ${body}")
   .toD("kafka:order-events?brokers=${env.KAFKA_BOOTSTRAP_SERVERS}");

Some things need to be clarified here. Before sending a message to the Kafka topic we need to serialize it to the JSON format. The application does not anything about the Kafka address. This address has been injected into the container by the KafkaBinding object. It is available for the Camel route as the environment variable KAFKA_BOOTSTRAP_SERVERS.

Now, let’s switch to the customer-saga application. In order to receive an event for the Knative broker, we should expose an HTTP POST endpoint. This endpoint takes Order as an input. Then, if the order’s status equals NEW it performs a reservation on the customer account. Before that, it sends back a response to a reserver-events topic.

Also, let’s take a look at the fragment responsible for searching the customer in the database and performing an update. We use Quarkus Camel JPA extension. First, we need to define a JPQL query to retrieve an entity. Then, we update the Customer entity depending on the order status.

rest("/customers")
   .post("/reserve").consumes("application/json")
   .route()
      .log("Order received: ${body}")
      .unmarshal().json(JsonLibrary.Jackson, Order.class)
      .choice()
         .when().simple("${body.status.toString()} == 'NEW'")
            .setBody(exchange -> {
               Order order = exchange.getIn().getBody(Order.class);
               order.setStatus(OrderStatus.IN_PROGRESS);
               return order;
            })
            .marshal().json(JsonLibrary.Jackson)
            .log("Reservation sent: ${body}")
            .toD("kafka:reserve-events?brokers=${env.KAFKA_BOOTSTRAP_SERVERS}")
      .end()
      .unmarshal().json(JsonLibrary.Jackson, Order.class)
      .setProperty("orderAmount", simple("${body.amount}", Integer.class))
      .setProperty("orderStatus", simple("${body.status}", OrderStatus.class))
      .toD("jpa:" + Customer.class.getName() + 
         "?query=select c from Customer c where c.id= ${body.customerId}")
      .choice()
         .when().simple("${exchangeProperty.orderStatus} == 'IN_PROGRESS'")
            .setBody(exchange -> {
               Customer customer = (Customer) exchange.getIn().getBody(List.class).get(0);
               customer.setAmountReserved(customer.getAmountReserved() + 
                  exchange.getProperty("orderAmount", Integer.class));
               customer.setAmountAvailable(customer.getAmountAvailable() - 
                  exchange.getProperty("orderAmount", Integer.class));
               return customer;
            })
            .otherwise()
               .setBody(exchange -> {
                  Customer customer = (Customer) exchange.getIn().getBody(List.class).get(0);
                  customer.setAmountReserved(customer.getAmountReserved() - 
                     exchange.getProperty("orderAmount", Integer.class));
                  return customer;
               })
      .end()
      .log("Current customer: ${body}")
      .to("jpa:" + Customer.class.getName() + "?useExecuteUpdate=true")
      .setHeader(Exchange.HTTP_RESPONSE_CODE, constant(201)).setBody(constant(null))
.endRest();

We can also generate some test data in customer-saga using Camel route. It runs once just after the Quarkus application startup.

from("timer://runOnce?repeatCount=1&delay=100")
   .loop(10)
      .setBody(exchange -> new Customer(null, "Test"+(++i), r.nextInt(50000), 0))
      .to("jpa:" + Customer.class.getName())
      .log("Add: ${body}")
   .end();

6. Configure Knative Eventing with Kafka broker

6.1. Architecture with Quarkus, Camel and Kafka

We have already created two applications built on top of Quarkus and Apache Camel. Both of these applications expose HTTP POST endpoints and send events to the Kafka topics. Now, we need to create some Kubernetes objects to orchestrate the process. So far, we just send the events to topics, they have not been routed to the target applications. Let’s take a look at the architecture of our system. There are two topics on Kafka that receive events: order-events and reserve-events. The messages from those topics are not automatically sent to the Knative broker. So, first, we need to create the KafkaSource object to get messages from these topics and send them to the broker.

quarkus-camel-kafka-arch

6.2. Configure Knative eventing

The KafkaSource object takes the list of input topics and a target application. In that case, the target application is the Knative broker. We may create a single KafkaSource for both topics or two sources per each topic. Here’s the KafkaSource definition that takes messages from the reserve-events topic.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-reserve-order
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - reserve-events
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default

Let’s create both sources. After creating them, we may verify if everything went well by executing the command kubectl get sources.

quarkus-camel-kafka-sources

Before running our application on Knative we should create KafkaBinding objects. This object is responsible for injecting the address of the Kafka cluster into the application containers. The address of a broker will be available for the application under the KAFKA_BOOTSTRAP_SERVERS environment variable.

apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
  name: kafka-binding-customer-saga
spec:
  subject:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: customer-saga
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092

Let’s create both KafkaBinding objects. Here’s the list of available bindings after running the kubectl get bindings command.

Finally, we may proceed to the last step of our configuration. We will create triggers. A trigger represents a desire to subscribe to events from a specific broker. Moreover, we may apply a simple filtering mechanism using the Trigger object. For example, if we want to send only the events from the order-events topic and with the type dev.knative.kafka.event we should create a definition as shown below.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: customer-saga-trigger
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/kafka-source-orders-customer#order-events
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /customers/reserve

Similarly, we should create a trigger that sends messages to the order-saga POST endpoint. It gets messages from the reserve-events source and sends them to the /orders/confirm endpoint.

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: order-saga-trigger
spec:
  broker: default
  filter:
    attributes:
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/kafka-source-reserve-order#reserve-events
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: order-saga
    uri: /orders/confirm

Finally, we can display a list of active triggers by executing the command kubectl get trigger.

quarkus-camel-kafka-triggers

7. Deploy Quarkus application on Knative

Once we finished the development of our sample applications we may deploy them on Knative. One of the possible deployment options is with Apache Camel K. In case of any problems with this type of deployment we may also use the Quarkus Kubernetes module. Firstly, let’s include two required modules. We will also leverage the Jib Maven plugin.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-kubernetes</artifactId>
</dependency>
<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-container-image-jib</artifactId>
</dependency>

The rest of the configuration should be provided inside the application properties file. In the first step, we need to enable automatic deployment on Kubernetes by setting the property quarkus.kubernetes.deploy to true. By default, Quarkus creates a standard Kubernetes Deployment. Therefore, we should set the quarkus.kubernetes.deployment-target to knative. In that case, it will generate a Knative Service YAML. Finally, we have to change the name of the image group to dev.local. Of course, it is required just if we run our applications on the local Kubernetes cluster like me.

quarkus.kubernetes.deploy = true
quarkus.kubernetes.deployment-target = knative
quarkus.container-image.group = dev.local

Now, if run the build with the mvn clean package command, our application will be automatically deployed on Knative. After that, let’s verify the list of Knative services.

Once, the order-saga application is started, it generates one order per 10 seconds and then sends it to the Kafka topic order-events. We can easily verify that it works properly, by checking out a list of active topics as shown below.

We can also verify a list of Knative events exchanged by the applications.

$ kubectl get eventtype

Final Thoughts

Quarkus and Apache Camel seem to be a perfect combination when creating serverless applications on Knative. We can easily implement the whole logic within a single source code file. We can also use Camel K to deploy our applications on Kubernetes or Knative. You can compare the approach described in this article with the one based on Quarkus and its extensions to Kafka or Knative available in this repository.

The post Knative Eventing with Quarkus, Kafka and Camel appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/06/14/knative-eventing-with-quarkus-kafka-and-camel/feed/ 4 9797
Knative Eventing with Kafka and Quarkus https://piotrminkowski.com/2021/03/31/knative-eventing-with-kafka-and-quarkus/ https://piotrminkowski.com/2021/03/31/knative-eventing-with-kafka-and-quarkus/#respond Wed, 31 Mar 2021 12:55:26 +0000 https://piotrminkowski.com/?p=9620 In this article, you will learn how to run eventing applications on Knative using Kafka and Quarkus. Previously I described the same approach for Kafka and Spring Cloud. If you want to compare both of them read my article Knative Eventing with Kafka and Spring Cloud. We will deploy exactly the same architecture. However, instead […]

The post Knative Eventing with Kafka and Quarkus appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to run eventing applications on Knative using Kafka and Quarkus. Previously I described the same approach for Kafka and Spring Cloud. If you want to compare both of them read my article Knative Eventing with Kafka and Spring Cloud. We will deploy exactly the same architecture. However, instead of Spring Cloud Functions we will use Quarkus Funqy. Also, Spring Cloud Stream may be replaced with Quarkus Kafka. Before we start, let’s clarify some things.

Concept over Knative and Quarkus

Quarkus supports Knative in several ways. First of all, we may use the Quarkus Kubernetes module to simplify deployment on Knative. We can also use the Quarkus Funqy Knative Event extension to route and process cloud events within functions. That’s not all. Quarkus supports a serverless functional style. With the Quarkus Funqy module, we can write functions deployable to various FaaS (including Knative). These functions can be invoked through HTTP. Finally, we may integrate our application with Kafka topics using annotations from the Quarkus Kafka extension.

The Quarkus Funqy Knative Event module bases on the Knative broker and triggers. Since we will use Kafka Source instead of broker and trigger we won’t include that module. However, we can still take advantage of Quarkus Funqy and HTTP binding.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

As I mentioned before, we will the same architecture and scenario as in my previous article about Knative eventing. Let’s briefly describe it.

Today we will implement an eventual consistency pattern (also known as a SAGA pattern). How it works? The sample system consists of three services. The order-service creates a new order that is related to the customers and products. That order is sent to the Kafka topic. Then, our two other applications customer-service and product-service receive the order event. After that, they perform a reservation. The customer-service reserves an order’s amount on the customer’s account. Meanwhile the product-service reserves a number of products specified in the order. Both these services send a response to the order-service through the Kafka topic. If the order-service receives positive reservations from both services it confirms the order. Then, it sends an event with that information. Both customer-service and product-service receive the event and confirm reservations. You can verify it in the picture below.

quarkus-knative-eventing-arch

Prerequisites

There are several requirements we need to comply before start. I described them all in my previous article about Knative Eventing. Here’s just a brief remind:

  1. Kubernetes cluster with at least 1.17 version. I’m using a local cluster. If you use a remote cluster replace dev.local in image name into your Docker account name
  2. Install Knative Serving and Eventing on your cluster. You may find the detailed installation instructions here.
  3. Install Kafka Eventing Broker. Here’s the link to the releases site. You don’t need everything – we will use the KafkaSource and KafkaBinding CRDs
  4. Install Kafka cluster with the Strimzi operator. I installed it in the kafka namespace. The name of my cluster is my-cluster.

Step 1. Installing Kafka Knative components

Assuming you have already installed all the required elements to run Knative Eventing on your Kubernetes cluster, we may create some components dedicated to applications. You may find YAML manifests with object declarations in the k8s directory inside every single application directory. Firstly, let’s create a KafkaBinding. It is responsible for injecting the address of the Kafka cluster into the application container. Thanks to KafkaBinding that address is visible inside the container as the KAFKA_BOOTSTRAP_SERVERS environment variable. Here’s an example of the YAML declaration for the customer-saga application. We should create similar objects for two other applications.

apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
  name: kafka-binding-customer-saga
spec:
  subject:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: customer-saga
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092

In the next step, we create the KafkaSource object. It reads events from the particular topic and passes them to the consumer. It calls the HTTP POST endpoint exposed by the application. We can override a default context path of the HTTP endpoint. For the customer-saga the target URL is /reserve. It should receive events from the order-events topic. Because both customer-saga and product-saga listen for events from the order-events topic we need to create a similar KafkaSource object also for product-saga.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-orders-customer
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - order-events
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /reserve

On the other hand, the order-saga listens for events on the reserve-events topic. If you want to verify our scenario once again please refer to the diagram in the Source Code section. This time the target URL is /confirm.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source-orders-confirm
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - reserve-events
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: order-saga
    uri: /confirm

Let’s verify a list of Kafka sources. In our case there is a single KafkaSource per application. Before deploying our Quarkus application on Knative your Kafka source won’t be ready.

quarkus-knative-eventing-kafkasource

Step 2. Integrating Quarkus with Kafka

In order to integrate Quarkus with Apache Kafka, we may use the SmallRye Reactive Messaging library. Thanks to that we may define an input and output topic for each method using annotations. The messages are serialized to JSON. We can also automatically expose Kafka connection status in health check. Here’s the list of dependencies we need to include in Maven pom.xml.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-jackson</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-health</artifactId>
</dependency>

Before we start with the source code, we need to provide some configuration settings in the application.properties file. Of course, Kafka requires a connection URL to the cluster. We use the environment variable injected by the KafkaBinding object. Also, the output topic name should be configured. Here’s a list of required properties for the order-saga application.

kafka.bootstrap.servers = ${KAFKA_BOOTSTRAP_SERVERS}

mp.messaging.outgoing.order-events.connector = smallrye-kafka
mp.messaging.outgoing.order-events.topic = order-events
mp.messaging.outgoing.order-events.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Finally, we may switch to the code. Let’s start with order-saga. It will continuously send orders to the order-events topic. Those events are received by both customer-saga and product-saga applications. The method responsible for generating and sending events returns reactive stream using Mutiny Multi. It sends an event every second. We need to annotate the method with the @Outgoing annotation passing the name of output defined in application properties. Also, @Broadcast annotation Indicates that the event is dispatched to all subscribers. Before sending, every order needs to be persisted in a database (we use H2 in-memory database).

@ApplicationScoped
@Slf4j
public class OrderPublisher {

   private static int num = 0;

   @Inject
   private OrderRepository repository;
   @Inject
   private UserTransaction transaction;

   @Outgoing("order-events")
   @Broadcast
   public Multi<Order> orderEventsPublish() {
      return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
           .map(tick -> {
              Order o = new Order(++num, num%10+1, num%10+1, 100, 1, OrderStatus.NEW);
              try {
                 transaction.begin();
                 repository.persist(o);
                 transaction.commit();
              } catch (Exception e) {
                 log.error("Error in transaction", e);
              }

              log.info("Order published: {}", o);
              return o;
           });
   }

}

Step 3. Handling Knative events with Quarkus Funqy

Ok, in the previous step we have already implemented a part of the code responsible for sending events to the Kafka topic. We also have KafkaSource that is responsible for dispatching events from the Kafka topic into the application HTTP endpoint. Now, we just need to handle them. It is very simple with Quarkus Funqy. It allows us to create functions according to the serverless Faas approach. But we can also easily bound each function to the HTTP endpoint with the Quarkus Funqy HTTP extension. Let’s include it in our dependencies.

 <dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-funqy-http</artifactId>
 </dependency>

In order to create a function with Quarkus Funqy, we just need to annotate the particular method with @Funq. The name of the method is reserve, so it is automatically bound to the HTTP endpoint POST /reserve. It takes a single input parameter, which represents incoming order. It is automatically deserialized from JSON.

In the fragment of code visible below, we implement order handling in the customer-saga application. Once it receives an order, it performs a reservation on the customer account. Then it needs to send a response to the order-saga. To do that we may use Quarkus reactive messaging support once again. We define the Emitter object that allows us to send a single event into the topic. We may use inside a method that does not return any output that should be sent to a topic (with @Outgoing). The Emitter bean should be annotated with @Channel. It works similar to @Outgoing. We also need to define an output topic related to the name of the channel.

@Slf4j
public class OrderReserveFunction {

   @Inject
   private CustomerRepository repository;
   @Inject
   @Channel("reserve-events")
   Emitter<Order> orderEmitter;

   @Funq
   public void reserve(Order order) {
      log.info("Received order: {}", order);
      doReserve(order);
   }

   private void doReserve(Order order) {
      Customer customer = repository.findById(order.getCustomerId());
      log.info("Customer: {}", customer);
      if (order.getStatus() == OrderStatus.NEW) {
         customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
         customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
         order.setStatus(OrderStatus.IN_PROGRESS);
         log.info("Order reserved: {}", order);
         orderEmitter.send(order);
      } else if (order.getStatus() == OrderStatus.CONFIRMED) {
         customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
      }
      repository.persist(customer);
   }
}

Here are configuration properties for integration between Kafka and Emitter. The same configuration properties should be created for both customer-saga and product-saga.

kafka.bootstrap.servers = ${KAFKA_BOOTSTRAP_SERVERS}

mp.messaging.outgoing.reserve-events.connector = smallrye-kafka
mp.messaging.outgoing.reserve-events.topic = reserve-events
mp.messaging.outgoing.reserve-events.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

Finally, let’s take a look at the implementation of the Quarkus function inside the product-saga application. It also sends a response to the reserve-events topic using the Emitter object. It handles incoming orders and performs a reservation for the requested number of products.

@Slf4j
public class OrderReserveFunction {

   @Inject
   private ProductRepository repository;

   @Inject
   @Channel("reserve-events")
   Emitter<Order> orderEmitter;

   @Funq
   public void reserve(Order order) {
      log.info("Received order: {}", order);
      doReserve(order);
   }

   private void doReserve(Order order) {
      Product product = repository.findById(order.getProductId());
      log.info("Product: {}", product);
      if (order.getStatus() == OrderStatus.NEW) {
         product.setReservedItems(product.getReservedItems() + order.getProductsCount());
         product.setAvailableItems(product.getAvailableItems() - order.getProductsCount());
         order.setStatus(OrderStatus.IN_PROGRESS);
         orderEmitter.send(order);
      } else if (order.getStatus() == OrderStatus.CONFIRMED) {
         product.setReservedItems(product.getReservedItems() - order.getProductsCount());
      }
      repository.persist(product);
   }
}

Step 4. Deploy Quarkus application on Knative

Finally, we can deploy all our applications on Knative. To simplify that process we may use Quarkus Kubernetes support. It is able to automatically generate deployment manifests based on the source code and application properties. Quarkus also supports building images with Jib. So first, let’s add the following dependencies to Maven pom.xml.

 <dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-kubernetes</artifactId>
 </dependency>
 <dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-container-image-jib</artifactId>
 </dependency>

In the next step, we need to add some configuration settings to the application.properties file. To enable automatic deployment on Kubernetes the property quarkus.kubernetes.deploy must be set to true. Then we should change the target platform into Knative. Thanks to that Quarkus will generate Knative Service instead of a standard Kubernetes Deployment. The last property quarkus.container-image.group is responsible for setting the name of the image owner group. For local development with Knative, we should set the dev.local value there.

quarkus.kubernetes.deploy = true
quarkus.kubernetes.deployment-target = knative
quarkus.container-image.group = dev.local

After setting all the values visible above we just need to execute Maven build to deploy the application.

$ mvn clean package

After running Maven build for all the applications let’s verify a list of Knative Services.

Once the order-saga application starts it begins sending orders continuously. It also receives order events sent by customer-saga and product-saga. Those events are processed by the Quarkus function. Here are the logs printed by order-saga.

Final Thoughts

As you see, we can easily implement and deploy Quarkus applications on Knative. Quarkus provides several extensions that simplify integration with the Knative Eventing model and Kafka broker. We can use Quarkus Funqy to implement the serverless FaaS approach or SmallRye Reactive Messaging to integrate with Apache Kafka. You can compare that Quarkus support with Spring Boot in my previous article: Knative Eventing with Kafka and Spring Cloud.

The post Knative Eventing with Kafka and Quarkus appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/03/31/knative-eventing-with-kafka-and-quarkus/feed/ 0 9620
Knative Eventing with Kafka and Spring Cloud https://piotrminkowski.com/2021/03/12/knative-eventing-with-kafka-and-spring-cloud/ https://piotrminkowski.com/2021/03/12/knative-eventing-with-kafka-and-spring-cloud/#comments Fri, 12 Mar 2021 11:55:12 +0000 https://piotrminkowski.com/?p=9569 In this article, you will learn how to run eventing applications on Knative using Kafka and Spring Cloud. I’ll show you what is Knative Eventing, and how to integrate it with the Kafka broker. We will build our applications on top of Spring Cloud Function and Spring Cloud Stream. All these solutions seem to be […]

The post Knative Eventing with Kafka and Spring Cloud appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to run eventing applications on Knative using Kafka and Spring Cloud. I’ll show you what is Knative Eventing, and how to integrate it with the Kafka broker. We will build our applications on top of Spring Cloud Function and Spring Cloud Stream. All these solutions seem to be a perfect match. Why? Let me invite you to read the article.

However, before we proceed you need to have a piece of knowledge about Knative basic concepts. Therefore, I suggest you read more about it. You can start with those two articles: Spring Boot on Knative and Microservices on Knative with GraalVM and Spring Boot. Of course, you can as well refer to the Knative documentation.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

Today we will base on the simple architecture that complies with an eventual consistency pattern. It is also known as a SAGA pattern. What exactly is that? The sample system consists of three services. The order-service creates a new order that is related to the customers and products. That order is sent to the Kafka topic. Then, our two other applications customer-service and product-service receive the order event. After that, they perform a reservation. The customer-service reserves an order’s amount on the customer’s account. Meanwhile the product-service reserves a number of products specified in the order. Both these services send a response to the order-service through the Kafka topic. If the order-service receives positive reservations from both services it confirms the order. Then, it sends an event with that information. Both customer-service and product-service receive the event and confirm reservations. You can verify it in the picture below.

knative-eventing-kafka-arch

Prerequisites

Before we start, we first need to install Knative on the Kubernetes cluster. I’m using a local instance of Kubernetes. But you may as well use any remote like GKE. However, the latest version of Knative requires a Kubernetes cluster v1.17 or later. Of course, we need to install both Serving and Eventing components. You may find the detailed installation instructions here.

That’s not all. We also need to install Kafka Eventing Broker. Here’s the link to the releases site. It includes several deployments and CRDs. You should pay special attention to the KafkaSource and KafkaBinding CRDs, since we will use them later.

Finally, we need to install Kafka cluster on Kubernetes. The recommended way to do that is with the Strimzi operator. Strimzi provides container images and operators for running Kafka on Kubernetes. It also comes with a set of CRDs for managing the Kafka cluster. Once you install it you may proceed to the next steps. I installed it in the kafka namespace. Here’s the list of running pods.

Step 1: Create and configure Knative Kafka Broker

In the first step, we are going to create a Kafka cluster using Strimzi CRD. To simplify, we won’t use any more advanced configuration settings. For example, I used ephemeral storage, which is not recommended in production. I set three instances of Zookeeper. I heard that Kafka is finally planning to resign from Zookeeper, but the current version still bases on it.

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    replicas: 1
    listeners:
      plain: {}
      tls: {}
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.4"
    storage:
      type: ephemeral
  zookeeper:
    replicas: 3
    storage:
      type: ephemeral
  entityOperator:
    topicOperator: {}
    userOperator: {}

The Knative broker allows to route events to different event sinks or consumers. We may use different broker providers. When an event is sent to the broker, all request metadata other than the CloudEvent data and context attributes are stripped away. The event delivery mechanism hides details of event routing from the event producer and consumer. The default broker class is MTChannelBasedBroker. We will change it into Kafka.

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    eventing.knative.dev/broker.class: Kafka
  name: default
spec:
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing

In this article, we won’t directly use Kafka broker. Instead, we will use the KafkaSource object that takes events from a particular topic and sends them to the subscriber. If you want to use Broker you need to define Knative Trigger that refers to it.

The broker refers to the ConfigMap kafka-broker-config. The most important thing there is to set the address of the Kafka cluster. If you didn’t change anything in the default Kafka installation files it is ${KAFKA_CLUSTER_NAME}-kafka-bootstrap and port 9092.

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  default.topic.partitions: "10"
  default.topic.replication.factor: "1"
  bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"

Step 2: Create an application with Spring Cloud Stream

Let’s start with dependencies. Each of our applications uses an in-memory H2 database. They integrate with the database using the Spring Data JPA repository pattern. However, the most important thing is that they all base on Spring Cloud Stream to interact with Kafka topics. Spring Cloud Stream requires adding a concrete binder implementation to the classpath. That’s why we add the spring-cloud-starter-stream-kafka starter. For some time the Spring Cloud Stream programming model is built on top of Spring Cloud Function. Fortunately, we may easily export functions as an HTTP endpoint. This feature will be useful for us later. Currently, let’s just take a look at a list of included dependencies.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-function-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.16</version>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
</dependency>

Here’s the model class for the order-service. Once the order is created and saved in the database, the order-service sends it to the output Kafka topic.

@Entity
@Table(name = "orders")
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Order {
    @Id
    private Integer id;
    private Integer customerId;
    private Integer productId;
    private int amount;
    private int productCount;
    @Enumerated
    private OrderStatus status = OrderStatus.NEW;
}

We have three functions inside the order-service application main class. Two of them send events to the output destination continuously. On the other hand, the third of them confirm() wait for incoming events. We will discuss it later. The orderEventSupplier function represents the first step in our scenario. It creates a new order with test data, saves it in the database before sending.

@SpringBootApplication
@Slf4j
public class OrderSagaApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderSagaApplication.class, args);
    }
    private static int num = 0;
    private BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
    @Bean
    public Supplier<Order> orderEventSupplier() {
        return () -> repository.save(new Order(++num, num%10+1, num%10+1, 100, 1, OrderStatus.NEW));
    }
    @Bean
    public Supplier<Order> orderConfirmSupplier() {
        return () -> queue.poll();
    }
    @Bean
    public Consumer<Message<Order>> confirm() {
        return this::doConfirm;
    }
    @Autowired
    OrderRepository repository;
    private void doConfirm(Message<Order> msg) {
        Order o = msg.getPayload();
        log.info("Order received: {}", o);
        Order order = repository.findById(o.getId()).orElseThrow();
        if (order.getStatus() == OrderStatus.NEW) {
            order.setStatus(OrderStatus.IN_PROGRESS);
        } else if (order.getStatus() == OrderStatus.IN_PROGRESS) {
            order.setStatus(OrderStatus.CONFIRMED);
            log.info("Order confirmed : {}", order);
            queue.offer(order);
        }
        repository.save(order);
    }
}

The name of the output Kafka topic is order-events. We set it for both Supplier functions using the Spring Cloud Stream bindings pattern. On the other hand, the Consumer function will not receive events directly from the Kafka topic. Why? Because it is a part of Knative Eventing process and I will explain it later in the step. For now, it is important to specify that only suppliers bind to the external destination using the spring.cloud.function.definition property.

spring.application.name: order-saga
spring.cloud.stream.bindings.orderEventSupplier-out-0.destination: order-events
spring.cloud.stream.bindings.orderConfirmSupplier-out-0.destination: order-events
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
spring.cloud.function.definition: orderEventSupplier;orderConfirmSupplier

Finally, we need to create the KafkaBinding that will inject Kafka bootstrap information into the application container (through the Knative Service). Then, the application can access it as the KAFKA_BOOTSTRAP_SERVERS environment variable.

apiVersion: bindings.knative.dev/v1beta1
kind: KafkaBinding
metadata:
  name: kafka-binding-order-saga
spec:
  subject:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: order-saga
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092

Step 3: Create Kafka sources and Spring Cloud Function endpoints

Ok, we have already created a function responsible for generating and sending orders to the Kafka topic inside the order-service. So, now our goal is to receive and handle it on the customer-service and product-service sides. Our applications won’t directly listen for incoming events on the Kafka topic. To clarify, the basic Knative Eventing assumption is that the application don’t care how the events are published. It will just receive the events as an HTTP POST. And here comes KafkaSource object. It takes a list of input topics and a destination sink as parameters. In our case, it gets messages from order-events and send it as HTTP POST to the endpoint /customers/reserve of the customer-saga Knative Service.

apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
  name: kafka-source
spec:
  bootstrapServers:
    - my-cluster-kafka-bootstrap.kafka:9092
  topics:
    - order-events
  sink:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: customer-saga
    uri: /customers/reserve

Here’s an implementation of the customer-saga application. Thanks to Spring Cloud Function Web it automatically exports the reserve function as the HTTP endpoint with the path /reserve. Once, the consumer receives the event it performs the rest of business logic. If the input order has a NEW status the customer-saga creates reservation for a particular amount on the customer account. Then it sends event response to the order-saga. In other words, it first puts event into BlockingQueue. We also use a Supplier function for sending events to the Kafka topic. This time supplier function takes Order objects from BlockingQueue. Finally, if our application receives confirmation order from order-saga it commits the whole transaction by removing reserved amount.

@SpringBootApplication
@Slf4j
public class CustomerSagaApplication {
    public static void main(String[] args) {
        SpringApplication.run(CustomerSagaApplication.class, args);
    }
    private BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
    @Autowired
    private CustomerRepository repository;
    
    @Bean
    public Supplier<Order> orderEventSupplier() {
        return () -> queue.poll();
    }
    @Bean
    public Consumer<Message<Order>> reserve() {
        return this::doReserve;
    }
    private void doReserve(Message<Order> msg) {
        Order order = msg.getPayload();
        log.info("Body: {}", order);
        Customer customer = repository.findById(order.getCustomerId()).orElseThrow();
        log.info("Customer: {}", customer);
        if (order.getStatus() == OrderStatus.NEW) {
            customer.setAmountReserved(customer.getAmountReserved() + order.getAmount());
            customer.setAmountAvailable(customer.getAmountAvailable() - order.getAmount());
            order.setStatus(OrderStatus.IN_PROGRESS);
            queue.offer(order);
        } else if (order.getStatus() == OrderStatus.CONFIRMED) {
            customer.setAmountReserved(customer.getAmountReserved() - order.getAmount());
        }
        repository.save(customer);
    }
}

We can also set the base context path for HTTP endpoints using the spring.cloud.function.web.path property. So, the final path of our target endpoint is /customers/reserver. It is the same as the address defined in the KafkaSource definition.

spring.cloud.function.web.path: /customers

Here’s a configuration for the customer-saga inside the application.yml file.

spring.application.name: customer-saga
spring.cloud.function.web.path: /customers
spring.cloud.stream.bindings.orderEventSupplier-out-0.destination: reserve-events
spring.kafka.bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
spring.cloud.function.definition: orderEventSupplier

The implementation of the business logic inside product-saga is pretty similar to the customer-saga. There is a single Consumer function that receives orders, and a single Supplier responsible for sending a response to the order-saga.

@SpringBootApplication
@Slf4j
public class ProductSagaApplication {
    public static void main(String[] args) {
        SpringApplication.run(ProductSagaApplication.class, args);
    }
    @Autowired
    private ProductRepository repository;
    private BlockingQueue<Order> queue = new LinkedBlockingQueue<>();
    @Bean
    public Supplier<Order> orderEventSupplier() {
        return () -> queue.poll();
    }
    @Bean
    public Consumer<Message<Order>> reserve() {
        return this::doReserve;
    }
    private void doReserve(Message<Order> msg) {
        Order order = msg.getPayload();
        log.info("Body: {}", order);
        Product product = repository.findById(order.getProductId()).orElseThrow();
        log.info("Product: {}", product);
        if (order.getStatus() == OrderStatus.NEW) {
            product.setReservedItems(product.getReservedItems() + order.getProductsCount());
            product.setAvailableItems(product.getAvailableItems() - order.getProductsCount());
            order.setStatus(OrderStatus.IN_PROGRESS);
            queue.offer(order);
        } else if (order.getStatus() == OrderStatus.CONFIRMED) {
            product.setReservedItems(product.getReservedItems() - order.getProductsCount());
        }
        repository.save(product);
    }
}

Step 4: Run applications on Knative Eventing and Kafka

Here’s a typical definition of the Knative Service for our applications. I’m using the dev.local option, but if you run a remote cluster you may replace it with your Docker username or any other repository account you have.

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: order-saga
spec:
  template:
    spec:
      containers:
        - image: dev.local/order-saga

I use Skaffold together with Jib Maven Plugin for building and deploying applications on Knative. My target namespace is serverless. With the tail option you may observe logs after deployment. Of course, you may as well use the skaffold dev command.

$ skaffold run --tail -n serverless

After running all our applications on Knative eventing with Kafka we may verify a list of services using kn CLI.

knative-eventing-kafka-services

Then, we may verify that all KafkaBindings have been created. To do let’s just execute the following kubectl command.

The next important component is KafkaSource. We have already created three sources, a single one per application.

knative-eventing-kafka-sources

After starting, the order-saga application continuously generates and sends a new order each second. Both product-saga and customer-saga receive events and send responses. Thanks to that, the traffic is exchanged without any interruption. Except for the application pods we have three pods with Kafka sources.

Let’s just take a look at the application logs. Here are the logs from the order-saga. As you see it receives the order reservations from both customer-saga and product-saga. After that, it confirms the order and sends a response back to the order-events topic on Kafka. Basically, that’s what we wanted to achieve.

Final Thoughts

I hope you enjoyed this article. Knative is still a relatively new solution. I think we may expect some new and interesting features in the near future. With Knative Eventing you may use some other event sources than Kafka. Personally, I’m waiting for integration with RabbitMQ, which is under development now. For a full list of available solutions, you may refer to that site.

It is my third article about Knative and Spring Boot. You may expect more articles about Knative soon! Next time, I’m going to show you an example with another popular Java framework – Quarkus.

The post Knative Eventing with Kafka and Spring Cloud appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/03/12/knative-eventing-with-kafka-and-spring-cloud/feed/ 4 9569
Microservices on Knative with Spring Boot and GraalVM https://piotrminkowski.com/2021/03/05/microservices-on-knative-with-spring-boot-and-graalvm/ https://piotrminkowski.com/2021/03/05/microservices-on-knative-with-spring-boot-and-graalvm/#comments Fri, 05 Mar 2021 16:00:30 +0000 https://piotrminkowski.com/?p=9530 In this article, you will learn how to run Spring Boot microservices that communicate with each other on Knative. I also show you how to prepare a native image of the Spring Boot application with GraalVM. Then we will run it on Kubernetes using Skaffold and the Jib Maven Plugin. This article is the second […]

The post Microservices on Knative with Spring Boot and GraalVM appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to run Spring Boot microservices that communicate with each other on Knative. I also show you how to prepare a native image of the Spring Boot application with GraalVM. Then we will run it on Kubernetes using Skaffold and the Jib Maven Plugin.

This article is the second in a series of my article about Knative. After publishing the first of them, Spring Boot on Knative, you were asking me about a long application startup time after scaling to zero. That’s why I resolved this Spring Boot issue by compiling it to a native image with GraalVM. The problem with startup time seems to be an important thing in a serverless approach.

On Knative you can run any type of application – not only a function. In this article, when I’m writing “microservices”, in fact, I’m thinking about service to service communication.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions 🙂

As the example of microservices in this article, I used two applications callme-service and caller-service. Both of them exposes a single endpoint, which prints a name of the application pod. The caller-service application also calls the endpoint exposed by the callme-service application.

On Kubernetes, both these applications will be deployed as Knative services in multiple revisions. We will also distribute traffic across those revisions using Knative routes. The picture visible below illustrates the architecture of our sample system.

knative-microservices-with-spring-boot

1. Prepare Spring Boot microservices

We have two simple Spring Boot applications that expose a single REST endpoint, health checks, and run an in-memory H2 database. We use Hibernate and Lombok. Therefore, we need to include the following list of dependencies in Maven pom.xml.

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.16</version>
</dependency>

Each time we call the ping endpoint it creates an event and stores it in the H2 database. The REST endpoint returns the name of a pod and namespace inside Kubernetes and the id of the event. That method will be useful in our manual tests on the cluster.

@RestController
@RequestMapping("/callme")
public class CallmeController {

    @Value("${spring.application.name}")
    private String appName;
    @Value("${POD_NAME}")
    private String podName;
    @Value("${POD_NAMESPACE}")
    private String podNamespace;
    @Autowired
    private CallmeRepository repository;

    @GetMapping("/ping")
    public String ping() {
        Callme c = repository.save(new Callme(new Date(), podName));
        return appName + "(id=" + c.getId() + "): " + podName + " in " + podNamespace;
    }

}

Here’s our model class – Callme. The model class inside the caller-service application is pretty similar.

@Entity
@Getter
@Setter
@NoArgsConstructor
@RequiredArgsConstructor
public class Callme {

    @Id
    @GeneratedValue
    private Integer id;
    @Temporal(TemporalType.TIMESTAMP)
    @NonNull
    private Date addDate;
    @NonNull
    private String podName;

}

Also, let’s take a look at the first version of the ping method in CallerController. We will modify it later when we will discussing communication and tracing. For now, it is important to understand that this method also calls the ping method exposed by callme-service and returns the whole response.

@GetMapping("/ping")
public String ping() {
    Caller c = repository.save(new Caller(new Date(), podName));
    String callme = callme();
    return appName + "(id=" + c.getId() + "): " + podName + " in " + podNamespace
            + " is calling " + callme;
}

2. Prepare Spring Boot native image with GraalVM

Spring Native provides support for compiling Spring applications to native executables using the GraalVM native compiler. For more details about this project, you may refer to its documentation. Here’s the main class of our application.

@SpringBootApplication
public class CallmeApplication {

   public static void main(String[] args) {
      SpringApplication.run(CallmeApplication.class, args);
   }

}

Hibernate does a lot of dynamic things at runtime. So we need to get Hibernate to enhance the entities in our application at build time. We need to add the following Maven plugin to our build.

<plugin>
   <groupId>org.hibernate.orm.tooling</groupId>
   <artifactId>hibernate-enhance-maven-plugin</artifactId>
   <version>${hibernate.version}</version>
   <executions>
      <execution>
         <configuration>
            <failOnError>true</failOnError>
            <enableLazyInitialization>true</enableLazyInitialization>
            <enableDirtyTracking>true</enableDirtyTracking>
            <enableExtendedEnhancement>false</enableExtendedEnhancement>
         </configuration>
         <goals>
            <goal>enhance</goal>
         </goals>
      </execution>
   </executions>
</plugin>

In this article, I’m using the latest version of Spring Native – 0.9.0. Since Spring Native is actively developed, there are significant changes between subsequent versions. If you compare it to some other articles based on the earlier versions, we don’t have to disable proxyBeansMethods, exclude SpringDataWebAutoConfiguration, add spring-context-indexer into dependencies or create hibernate.properties. Cool! I can also use Buildpacks for building a native image.

So, now we just need to add the following dependency.

<dependency>
   <groupId>org.springframework.experimental</groupId>
   <artifactId>spring-native</artifactId>
   <version>0.9.0</version>
</dependency>

The Spring AOT plugin performs ahead-of-time transformations required to improve native image compatibility and footprint.

<plugin>
    <groupId>org.springframework.experimental</groupId>
    <artifactId>spring-aot-maven-plugin</artifactId>
    <version>${spring.native.version}</version>
    <executions>
        <execution>
            <id>test-generate</id>
            <goals>
                <goal>test-generate</goal>
            </goals>
        </execution>
        <execution>
            <id>generate</id>
            <goals>
                <goal>generate</goal>
            </goals>
        </execution>
    </executions>
</plugin>

3. Run native image on Knative with Buildpacks

Using Builpacks for creating a native image is our primary option. Although it requires a Docker daemon it works properly on every OS. However, we need to use the latest stable version of Spring Boot. In that case, it is 2.4.3. You can configure Buildpacks as well inside Maven pom.xml with the spring-boot-maven-plugin. Since we need to build and deploy the application on Kubernetes in one step, I prefer configuration in Skaffold. We use paketobuildpacks/builder:tiny as a builder image. It is also required to enable the native build option with the BP_BOOT_NATIVE_IMAGE environment variable.

apiVersion: skaffold/v2beta11
kind: Config
metadata:
  name: callme-service
build:
  artifacts:
  - image: piomin/callme-service
    buildpacks:
      builder: paketobuildpacks/builder:tiny
      env:
        - BP_BOOT_NATIVE_IMAGE=true
deploy:
  kubectl:
    manifests:
      - k8s/ksvc.yaml

Skaffold configuration refers to our Knative Service manifest. It is quite non-typical since we need to inject a pod and namespace names into the container. We also allow a maximum of 10 concurrent requests per single pod. If it is exceeded Knative scale up a number of running instances.

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: callme-service
spec:
  template:
    spec:
      containerConcurrency: 10
      containers:
      - name: callme
        image: piomin/callme-service
        ports:
          - containerPort: 8080
        env:
          - name: POD_NAME
            valueFrom:
              fieldRef:
                fieldPath: metadata.name
          - name: POD_NAMESPACE
            valueFrom:
              fieldRef:
                fieldPath: metadata.namespace

By default, Knative doesn’t allow to use Kubernetes fieldRef feature. In order to enable it, we need to update the knative-features ConfigMap in the knative-serving namespace. The required property name is kubernetes.podspec-fieldref.

kind: ConfigMap
apiVersion: v1
metadata:
  annotations:
  namespace: knative-serving
  labels:
    serving.knative.dev/release: v0.16.0
data:
  kubernetes.podspec-fieldref: enabled

Finally, we may build and deploy our Spring Boot microservices on Knative with the following command.

$ skaffold run

4. Run native image on Knative with Jib

The same as in my previous article about Knative we will build and run our applications on Kubernetes with Skaffold and Jib. Fortunately, Jib Maven Plugin has already introduced support for GraalVM “native images”. The Jib GraalVM Native Image Extension expects the native-image-maven-plugin to do the heavy lifting of generating a “native image” (with the native-image:native-image goal). Then the extension just simply copies the binary into a container image and sets it as executable.

Of course, unlike Java bytecode, a native image is not portable but platform-specific. The Native Image Maven Plugin doesn’t support cross-compilation, so the native-image should be built on the same OS as the runtime architecture. Since I build a GraalVM image of my applications on Ubuntu 20.10, I should use the same base Docker image for running containerized microservices. In that case, I chose image ubuntu:20.10 as shown below.

<plugin>
   <groupId>com.google.cloud.tools</groupId>
   <artifactId>jib-maven-plugin</artifactId>
   <version>2.8.0</version>
   <dependencies>
      <dependency>
         <groupId>com.google.cloud.tools</groupId>
         <artifactId>jib-native-image-extension-maven</artifactId>
         <version>0.1.0</version>
      </dependency>
   </dependencies>
   <configuration>
      <from>
         <image>ubuntu:20.10</image>
      </from>
      <pluginExtensions>
         <pluginExtension>
            <implementation>com.google.cloud.tools.jib.maven.extension.nativeimage.JibNativeImageExtension</implementation>
         </pluginExtension>
      </pluginExtensions>
   </configuration>
</plugin>

If you use Jib Maven Plugin you first need to build a native image. In order to build a native image of the application we also need to include a native-image-maven-plugin. Of you need to build our application using GraalVM JDK.

<plugin>
   <groupId>org.graalvm.nativeimage</groupId>
   <artifactId>native-image-maven-plugin</artifactId>
   <version>21.0.0.2</version>
   <executions>
      <execution>
         <goals>
            <goal>native-image</goal>
         </goals>
         <phase>package</phase>
      </execution>
   </executions>
</plugin>

So, the last in this section is just to run the Maven build. In my configuration, a native-image-maven-plugin needs to be activated under the native-image profile.

$ mvn clean package -Pnative-image

After the build native image of callme-service is visible inside the target directory.

The configuration of Skaffold is typical. We just need to enable Jib as a build tool.

apiVersion: skaffold/v2beta11
kind: Config
metadata:
  name: callme-service
build:
  artifacts:
  - image: piomin/callme-service
    jib: {}
deploy:
  kubectl:
    manifests:
      - k8s/ksvc.yaml

Finally, we may build and deploy our Spring Boot microservices on Knative with the following command.

$ skaffold run

5. Communication between microservices on Knative

I deployed two revisions of each application on Knative. Just for comparison, the first version of deployed applications is compiled with OpenJDK. Only the latest version is basing on the GraalVM native image. Thanks to that we may compare startup time for both revisions.

Let’s take a look at a list of revisions after deploying both versions of our applications. The traffic is splitted 60% to the latest version, and 40% to the previous version of each application.

Under the hood, Knative creates Kubernetes Services and multiple Deployments. There is always a single Deployment per each Knative Revision. Also, there are multiple services, but always one of them is per all revisions. That Service is an ExternalName service type. Assuming you still want to split traffic across multiple revisions you should use exactly that service in your communication. The name of the service is callme-service. However, we should use FQDN name with a namespace name and svc.cluster.local suffix.

We can use Spring RestTemplate for calling endpoint exposed by the callme-service. In order to guarantee tracing for the whole request path, we need to propagate Zipkin headers between the subsequent calls. For communication, we will use a service with a fully qualified internal domain name (callme-service.serverless.svc.cluster.local) as mentioned before.

@RestController
@RequestMapping("/caller")
public class CallerController {

   private RestTemplate restTemplate;

   CallerController(RestTemplate restTemplate) {
      this.restTemplate = restTemplate;
   }

   @Value("${spring.application.name}")
   private String appName;
   @Value("${POD_NAME}")
   private String podName;
   @Value("${POD_NAMESPACE}")
   private String podNamespace;
   @Autowired
   private CallerRepository repository;

   @GetMapping("/ping")
   public String ping(@RequestHeader HttpHeaders headers) {
      Caller c = repository.save(new Caller(new Date(), podName));
      String callme = callme(headers);
      return appName + "(id=" + c.getId() + "): " + podName + " in " + podNamespace
                     + " is calling " + callme;
   }

   private String callme(HttpHeaders headers) {
      MultiValueMap<String, String> map = new LinkedMultiValueMap<>();
      Set<String> headerNames = headers.keySet();
      headerNames.forEach(it -> map.put(it, headers.get(it)));
      HttpEntity httpEntity = new HttpEntity(map);
      ResponseEntity<String> entity = restTemplate
         .exchange("http://callme-service.serverless.svc.cluster.local/callme/ping",
                  HttpMethod.GET, httpEntity, String.class);
      return entity.getBody();
   }

}

In order to test the communication between our microservices we just need to invoke caller-service via Knative Route.

knative-microservices-routes

Let’s perform some test calls of the caller-service GET /caller/ping endpoint. We should use the URL http://caller-service-serverless.apps.cluster-d556.d556.sandbox262.opentlc.com/caller/ping.

knative-microservices-communication

In the first to requests caller-service calls the latest version of callme-service (compiled with GraalVM). In the third request it communicates with the older version callme-service (compiled with OpenJDK). Let’s compare the time of start for those two versions of the same application.

With GraalVM we have 0.3s instead of 5.9s. We should also keep in mind that our applications start an in-memory, embedded H2 database.

6. Configure tracing with Jaeger

In order to enable tracing for Knative, we need to update the knative-tracing ConfigMap in the knative-serving namespace. Of course, we first need to install Jaeger in our cluster.

apiVersion: operator.knative.dev/v1alpha1
kind: KnativeServing
metadata:
  name: knative-tracing
  namespace: knative-serving
spec:
  sample-rate: "1" 
  backend: zipkin 
  zipkin-endpoint: http://jaeger-collector.knative-serving.svc.cluster.local:9411/api/v2/spans 
  debug: "false"

You can also use Helm chart to install Jaeger. With this option, you need to execute the following Helm commands.

$ helm repo add jaegertracing https://jaegertracing.github.io/helm-charts
$ helm install jaeger jaegertracing/jaeger

Knative automatically creates Zipkin span headers. The only single goal for us is to propagate HTTP headers between the caller-service and callme-service applications. In my configuration, Knative sends 100% traces to Jaeger. Let’s take a look at some traces for GET /caller/ping endpoint within Knative microservices mesh.

knative-microservice-tracing-jaeger

We can also take a look on the detailed view for every single request.

Conclusion

There are several important things you need to consider when you are running microservices on Knative. I focused on the aspects related to communication and tracing. I also showed that Spring Boot doesn’t have to start in a few seconds. With GraalVM it can start in milliseconds, so you can definitely consider it as a serverless framework. You may expect more articles about Knative soon!

The post Microservices on Knative with Spring Boot and GraalVM appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/03/05/microservices-on-knative-with-spring-boot-and-graalvm/feed/ 2 9530
Spring Boot on Knative https://piotrminkowski.com/2021/03/01/spring-boot-on-knative/ https://piotrminkowski.com/2021/03/01/spring-boot-on-knative/#respond Mon, 01 Mar 2021 11:28:54 +0000 https://piotrminkowski.com/?p=9506 In this article, I’ll explain what is Knative and how to use it with Spring Boot. Although Knative is a serverless platform, we can run there any type of application (not just function). Therefore, we are going to run there a standard Spring Boot application that exposes REST API and connects to a database. Knative […]

The post Spring Boot on Knative appeared first on Piotr's TechBlog.

]]>
In this article, I’ll explain what is Knative and how to use it with Spring Boot. Although Knative is a serverless platform, we can run there any type of application (not just function). Therefore, we are going to run there a standard Spring Boot application that exposes REST API and connects to a database.

Knative introduces a new way of managing your applications on Kubernetes. It extends Kubernetes to add some new key features. One of the most significant of them is a “Scale to zero”. If Knative detects that a service is not used, it scales down the number of running instances to zero. Consequently, it provides a built-in autoscaling feature based on a concurrency or a number of requests per second. We may also take advantage of revision tracking, which is responsible for switching from one version of your application to another. With Knative you just have to focus on your core logic.

All the features I described above are provided by the component called “Knative Serving”. There are also two other components: Eventing” and “Build”. The Build component is deprecated and has been replaced by Tekton. The Eventing component requires attention. However, I’ll discuss it in more detail in the separated article.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions 🙂

I used the same application as the example in some of my previous articles about Spring Boot and Kubernetes. I just wanted to focus that you don’t have to change anything in the source code to run it also on Knative. The only required change will be in the YAML manifest.

Since Knative provides built-in autoscaling you may want to compare it with the horizontal pod autoscaler (HPA) on Kubernetes. To do that you may read the article Spring Boot Autoscaling on Kubernetes. If you are interested in how to easily deploy applications on Kubernetes read the following article about the Okteto platform.

Install Knative on Kubernetes

Of course, before we start Spring Boot development we need to install Knative on Kubernetes. We can do it using the kubectl CLI or an operator. You can find the detailed installation instruction here. I decided to try it on OpenShift. It is obviously the fastest way. I could do it with one click using the OpenShift Serverless Operator. No matter which type of installation you choose, the further steps will apply everywhere.

Using Knative CLI

This step is optional. You can deploy and manage applications on Knative with CLI. To download CLI do to the site https://knative.dev/docs/install/install-kn/. Then you can deploy the application using the Docker image.

$ kn service create sample-spring-boot-on-kubernetes \
   --image piomin/sample-spring-boot-on-kubernetes:latest

We can also verify a list of running services with the following command.

$ kn service list

For more advanced deployments it will be more suitable to use the YAML manifest. We will start the build from the source code build with Skaffold and Jib. Firstly, let’s take a brief look at our Spring Boot application.

Spring Boot application for Knative

As I mentioned before, we are going to create a typical Spring Boot REST-based application that connects to a Mongo database. The database is deployed on Kubernetes. Our model class uses the person collection in MongoDB. Let’s take a look at it.

@Document(collection = "person")
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Person {

   @Id
   private String id;
   private String firstName;
   private String lastName;
   private int age;
   private Gender gender;
}

We use Spring Data MongoDB to integrate our application with the database. In order to simplify this integration we take advantage of its “repositories” feature.

public interface PersonRepository extends CrudRepository<Person, String> {
   Set<Person> findByFirstNameAndLastName(String firstName, String lastName);
   Set<Person> findByAge(int age);
   Set<Person> findByAgeGreaterThan(int age);
}

Our application exposes several REST endpoints for adding, searching and updating data. Here’s the controller class implementation.

@RestController
@RequestMapping("/persons")
public class PersonController {

   private PersonRepository repository;
   private PersonService service;

   PersonController(PersonRepository repository, PersonService service) {
      this.repository = repository;
      this.service = service;
   }

   @PostMapping
   public Person add(@RequestBody Person person) {
      return repository.save(person);
   }

   @PutMapping
   public Person update(@RequestBody Person person) {
      return repository.save(person);
   }

   @DeleteMapping("/{id}")
   public void delete(@PathVariable("id") String id) {
      repository.deleteById(id);
   }

   @GetMapping
   public Iterable<Person> findAll() {
      return repository.findAll();
   }

   @GetMapping("/{id}")
   public Optional<Person> findById(@PathVariable("id") String id) {
      return repository.findById(id);
   }

   @GetMapping("/first-name/{firstName}/last-name/{lastName}")
   public Set<Person> findByFirstNameAndLastName(@PathVariable("firstName") String firstName,
			@PathVariable("lastName") String lastName) {
      return repository.findByFirstNameAndLastName(firstName, lastName);
   }

   @GetMapping("/age-greater-than/{age}")
   public Set<Person> findByAgeGreaterThan(@PathVariable("age") int age) {
      return repository.findByAgeGreaterThan(age);
   }

   @GetMapping("/age/{age}")
   public Set<Person> findByAge(@PathVariable("age") int age) {
      return repository.findByAge(age);
   }

}

We inject database connection settings and credentials using environment variables. Our application exposes endpoints for liveness and readiness health checks. The readiness endpoint verifies a connection with the Mongo database. Of course, we use the built-in feature from Spring Boot Actuator for that.

spring:
  application:
    name: sample-spring-boot-on-kubernetes
  data:
    mongodb:
      uri: mongodb://${MONGO_USERNAME}:${MONGO_PASSWORD}@mongodb/${MONGO_DATABASE}

management:
  endpoints:
    web:
      exposure:
        include: "*"
  endpoint.health:
      show-details: always
      group:
        readiness:
          include: mongo
      probes:
        enabled: true

Defining Knative Service in YAML

Firstly, we need to define a YAML manifest with a Knative service definition. It sets an autoscaling strategy using the Knative Pod Autoscaler (KPA). In order to do that we have to add annotation autoscaling.knative.dev/target with the number of simultaneous requests that can be processed by each instance of the application. By default, it is 100. We decrease that limit to 20 requests. Of course, we need to set liveness and readiness probes for the container. Also, we refer to the Secret and ConfigMap to inject MongoDB settings.

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: sample-spring-boot-on-kubernetes
spec:
  template:
    metadata:
      annotations:
        autoscaling.knative.dev/target: "20"
    spec:
      containers:
        - image: piomin/sample-spring-boot-on-kubernetes
          livenessProbe:
            httpGet:
              path: /actuator/health/liveness
          readinessProbe:
            httpGet:
              path: /actuator/health/readiness
          env:
            - name: MONGO_DATABASE
              valueFrom:
                secretKeyRef:
                  name: mongodb
                  key: database-name
            - name: MONGO_USERNAME
              valueFrom:
                secretKeyRef:
                  name: mongodb
                  key: database-user
            - name: MONGO_PASSWORD
              valueFrom:
                secretKeyRef:
                  name: mongodb
                  key: database-password

Configure Skaffold and Jib for Knative deployment

We will use Skaffold to automate the deployment of our application on Knative. Skaffold is a command-line tool that allows running the application on Kubernetes using a single command. You may read more about it in the article Local Java Development on Kubernetes. It may be easily integrated with the Jib Maven plugin. We just need to set jib as a default option in the build section of the Skaffold configuration. We can also define a list of YAML scripts executed during the deploy phase. The skaffold.yaml file should be placed in the project root directory. Here’s a current Skaffold configuration. As you see, it runs the script with the Knative Service definition.

apiVersion: skaffold/v2beta5
kind: Config
metadata:
  name: sample-spring-boot-on-kubernetes
build:
  artifacts:
    - image: piomin/sample-spring-boot-on-kubernetes
      jib:
        args:
          - -Pjib
  tagPolicy:
    gitCommit: {}
deploy:
  kubectl:
    manifests:
      - k8s/mongodb-deployment.yaml
      - k8s/knative-service.yaml

Skaffold activates the jib profile during the build. Within this profile, we will place a jib-maven-plugin. Jib is useful for building images in dockerless mode.

<profile>
   <id>jib</id>
   <activation>
      <activeByDefault>false</activeByDefault>
   </activation>
   <build>
      <plugins>
         <plugin>
            <groupId>com.google.cloud.tools</groupId>
            <artifactId>jib-maven-plugin</artifactId>
            <version>2.8.0</version>
         </plugin>
      </plugins>
   </build>
</profile>

Finally, all we need to do is to run the following command. It builds our application, creates and pushes a Docker image, and run it on Knative using knative-service.yaml.

$ skaffold run

Verify Spring Boot deployment on Knative

Now, we can verify our deployment on Knative. To do that let’s execute the command kn service list as shown below. We have a single Knative Service with the name sample-spring-boot-on-kubernetes.

spring-boot-knative-services

Then, let’s imagine we deploy three versions (revisions) of our application. To do that let’s just provide some changes in the source and redeploy our service using skaffold run. It creates new revisions of our Knative Service. However, the whole traffic is forwarded to the latest revision (with -vlskg suffix).

spring-boot-knative-revisions

With Knative we can easily split traffic between multiple revisions of the single service. To do that we need to add a traffic section in the Knative Service YAML configuration. We define a percent of the whole traffic per a single revision as shown below.

apiVersion: serving.knative.dev/v1
kind: Service
metadata:
  name: sample-spring-boot-on-kubernetes
  spec:
    template:
      ...
    traffic:
      - latestRevision: true
        percent: 60
        revisionName: sample-spring-boot-on-kubernetes-vlskg
      - latestRevision: false
        percent: 20
        revisionName: sample-spring-boot-on-kubernetes-t9zrd
      - latestRevision: false
        percent: 20
        revisionName: sample-spring-boot-on-kubernetes-9xhbw

Let’s take a look at the graphical representation of our current architecture. 60% of traffic is forwarded to the latest revision, while both previous revisions receive 20% of traffic.

spring-boot-knative-openshift

Autoscaling and scale to zero

By default, Knative supports autoscaling. We may choose between two types of targets: concurrency and requests-per-second (RPS). The default target is concurrency. As you probably remember, I have overridden this default value to 20 with the autoscaling.knative.dev/target annotation. So, our goal now is to verify autoscaling. To do that we need to send many simultaneous requests to the application. Of course, the incoming traffic is distributed across three different revisions of Knative Service.

Fortunately, we may easily simulate a large traffic with the siege tool. We will call the GET /persons endpoint that returns all available persons. We are sending 150 concurrent requests with the command visible below.

$ siege http://sample-spring-boot-on-kubernetes-pminkows-serverless.apps.cluster-7260.7260.sandbox1734.opentlc.com/persons \
   -i -v -r 1000  -c 150 --no-parser

Under the hood, Knative still creates a Deployment and scales down or scales up the number of running pods. So, if you have three revisions of a single Service, there are three different deployments created. Finally, I have 10 running pods for the latest deployment that receives 60% of traffic. There are also 3 and 2 running pods for the previous revisions.

What will happen if there is no traffic coming to the service? Knative will scale down the number of running pods for all the deployments to zero.

Conclusion

In this article, you learned how to deploy the Spring Boot application as a Knative service using Skaffold and Jib. I explained with the examples how to create a new revision of the Service, and distribute traffic across those revisions. We also test the scenario with autoscaling based on concurrent requests and scale to zero in case of no incoming traffic. You may expect more articles about Knative soon! Not only with Spring Boot 🙂

The post Spring Boot on Knative appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/03/01/spring-boot-on-knative/feed/ 0 9506