spring cloud stream kafka Archives - Piotr's TechBlog https://piotrminkowski.com/tag/spring-cloud-stream-kafka/ Java, Spring, Kotlin, microservices, Kubernetes, containers Tue, 06 Sep 2022 15:29:18 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 spring cloud stream kafka Archives - Piotr's TechBlog https://piotrminkowski.com/tag/spring-cloud-stream-kafka/ 32 32 181738725 Running Redpanda on Kubernetes https://piotrminkowski.com/2022/09/06/running-redpanda-on-kubernetes/ https://piotrminkowski.com/2022/09/06/running-redpanda-on-kubernetes/#comments Tue, 06 Sep 2022 15:29:14 +0000 https://piotrminkowski.com/?p=13109 In this article, you will learn how to install and manage Redpanda on Kubernetes. It is not the first article related to Redpanda on my blog. You can read more about Redpanda in my earlier post here. I’m describing there how to do a local development of Java apps with Redpanda, Quarkus, and Testcontainers. You […]

The post Running Redpanda on Kubernetes appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to install and manage Redpanda on Kubernetes. It is not the first article related to Redpanda on my blog. You can read more about Redpanda in my earlier post here. I’m describing there how to do a local development of Java apps with Redpanda, Quarkus, and Testcontainers.

You can use Redpanda in local development as an alternative to the standard Apache Kafka. It is a Kafka API-compatible tool but does not use ZooKeeper or JVM.

In this article, I’m going to show that you can also easily run and use Redpanda on Kubernetes. There are some interesting features that will surely interest you. Let’s begin.

Source Code

If you would like to try this exercise yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. Then switch to the redpanda branch. You will find sample applications for sending and receiving messages to Kafka in the event-driven directory. After that, just follow my instructions.

Install Cert Manager on Kubernetes

The recommended way to install Redpanda on Kubernetes is through the operator. The Redpanda operator requires the cert-manager to create certificates for TLS communication. So in the first step, we need to install cert-manager. We will use Helm for that. Let’s add the following Helm repository:

$ helm repo add jetstack https://charts.jetstack.io && \
  helm repo update

After that, we can install cert-manager in the cert-manager namespace. In order to create a namespace automatically, we should enable the create-namespace option. By default, the Cert Manager does not install CRDs on Kubernetes. Let’s enable it using the installCRDs Helm parameter:

$ helm install cert-manager \
   --namespace cert-manager --create-namespace \
   --set installCRDs=true \
   --version v1.9.1 jetstack/cert-manager

Here’s the list of the cert-manager pods. Assuming you have a similar result, you may proceed to the next section.

$ kubectl get pod -n cert-manager                                                                
NAME                                      READY   STATUS    RESTARTS   AGE
cert-manager-877fd747c-lhrgd              1/1     Running   0          1m
cert-manager-cainjector-bbdb88874-tmlg9   1/1     Running   0          1m
cert-manager-webhook-5774d5d8f7-flv7s     1/1     Running   0          1m

Install Redpanda Operator using Helm

Before we install the Redpanda operator we need to apply a single Cluster object CRD:

$ kubectl apply -k 'https://github.com/redpanda-data/redpanda/src/go/k8s/config/crd?ref=v22.2.2'

The same as before, we will use Helm in the installation process. Firstly, let’s add the official Redpanda Helm repository:

$ helm repo add redpanda https://charts.vectorized.io && \
  helm repo update

We will install the latest version of the Redpanda operator (22.2) in the redpanda-system namespace.

$ helm install redpanda-operator redpanda/redpanda-operator \
    --namespace redpanda-system \
    --create-namespace \
    --set monitoring.enabled=true \
    --version v22.2.2

After that, we may use the Cluster CRD object to create a single-node Redpanda cluster on Kubernetes. Also, let’s enable developer mode. Redpanda provides a built-in HTTP proxy and schema registry. The name of our cluster is one-node-cluster.

apiVersion: redpanda.vectorized.io/v1alpha1
kind: Cluster
metadata:
  name: one-node-cluster
spec:
  image: vectorized/redpanda
  version: latest
  replicas: 1
  resources:
    requests:
      cpu: 1
      memory: 2Gi
    limits:
      cpu: 1
      memory: 2Gi
  configuration:
    rpcServer:
      port: 33145
    kafkaApi:
    - port: 9092
    pandaproxyApi:
    - port: 8082
    schemaRegistry:
      port: 8081
    adminApi:
    - port: 9644
    developerMode: true

Redpanda will run in the redpanda namespace. Let’s create that namespace first.

$ kubectl create ns redpanda

Then, let’s create the Cluster object in the redpanda namespace.

$ kubectl apply -f redpanda-cluster.yaml -n redpanda

The Redpanda operator creates two Kubernetes Services for the cluster. The first of them one-node-cluster is a headless service and it is used in internal communication. We could have enabled external communication, but it is not required in our scenario. Applications are using the port 9092, which is compatible with the Kafka API. There is also a dedicated service for exposing the schema registry under the port 8081.

$ kubectl get svc -n redpanda
NAME                       TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                    AGE
one-node-cluster           ClusterIP   None            <none>        9644/TCP,9092/TCP,8082/TCP   3m
one-node-cluster-cluster   ClusterIP   10.98.26.202    <none>        8081/TCP                     3m

Install via Helm without operator

We can also use the Redpanda Helm chart, which does not require the installation of CRDs or an operator, but instead creates a cluster according to the configuration in a values.yaml file. This is the recommended way to install Redpanda on Kubernetes. In order to use it, clone the Redpanda repository with the Helm chart:

$ git clone https://github.com/redpanda-data/helm-charts.git
$ cd helm-charts/redpanda

Then, you need to install Redpanda using the following Helm command. Since we use a single-node Kubernetes cluster we override the default number of brokers using the statefulset.replicas parameter.

$ helm install redpanda . \
    -n redpanda \
    --create-namespace \
    --set statefulset.replicas=1

In comparison to the previous installation method, you would have to install the kube-prometheus-stack separately.

$ helm repo add prometheus-community https://prometheus-community.github.io/helm-charts && \
  helm repo update

Enable Prometheus Metrics

In the previous section, we installed the Prometheus stack using the Redpanda operator. By default, it monitors Kubernetes core components and the Redpanda operator. Our goal is to enable monitoring of the currently created Redpanda cluster in the redpanda namespace. To do that, we need to create the PodMonitor object provided by the Prometheus operator. It requires us to at least set the pod target namespace, label selector, and metrics endpoints. The Redpanda operator exposes metrics on the admin port (9644) under metrics and public_metrics paths.

apiVersion: monitoring.coreos.com/v1
kind: PodMonitor
metadata:
  labels:
    app.kubernetes.io/instance: redpanda-cluster
    app.kubernetes.io/managed-by: Helm
    app.kubernetes.io/name: redpanda-cluster
    app.kubernetes.io/version: v22.2.2
    helm.sh/chart: redpanda-operator-v22.2.2
    release: redpanda-operator
  name: redpanda-cluster-monitor
  namespace: redpanda-system
spec:
  namespaceSelector:
    matchNames:
      - redpanda
  podMetricsEndpoints:
    - path: /metrics
      port: admin
    - path: /public_metrics
      port: admin
  selector:
    matchLabels:
      app.kubernetes.io/name: redpanda

Once you create the PodMonitor, you will be able to query Redpanda metrics. There are a lot of exported metrics. You can verify these metrics in the Prometheus dashboard. Their names are starting with the vectorized_ prefix.

redpanda-kubernetes-prometheus

The simplest way to view important metrics is through the Grafana dashboard. Therefore, we are going to create a dashboard there. Fortunately, Redpanda provides an automatic mechanism for generating a Grafana dashboard. There is a dedicated Redpanda CLI (rpk) command to do it. We just need to set the name of the data source (Prometheus) and metrics endpoint URL (public_metrics).

$ kubectl exec pod/one-node-cluster-0 -n redpanda -c redpanda \
    -- rpk generate grafana-dashboard --datasource Prometheus \
    --metrics-endpoint http://localhost:9644/public_metrics \
    > redpanda-dashboard.json

Once we export the dashboard as a JSON file, we may import it into the Grafana dashboard.

Enable Redpanda Console

Redpanda provides a UI dashboard for managing cluster instances called Redpanda Console. However, it is not installed by the operator. In order to run it on Kubernetes, we will use the Helm chart. Firstly, let’s add the required Helm repository:

$ helm repo add redpanda-console https://packages.vectorized.io/public/console/helm/charts/ && \
  helm repo update

We need to override some configuration settings. By default, the Redpanda Console tries to detect both broker and schema registry on a localhost address. Since we are running Redpanda on Kubernetes we need to set the name of the service one-node-cluster for the broker and one-node-cluster-cluster for the schema registry. Here’s our values.yaml file:

console:
  config:
    kafka:
      brokers:
        - one-node-cluster:9092
      clientId: redpanda-console
    schemaRegistry:
      enabled: true
      urls: ["http://one-node-cluster-cluster:8081"]
      username: console
      password: redacted

Finally, let’s install the console in the same namespace as the broker.

$ helm install redpanda-console redpanda-console/console \
    --values values.yaml \
    -n redpanda

The Redpanda Console is available under the 8080 port. We can enable port-forward to access it locally.

Integrate Spring Boot with Redpanda

Our instance of Redpanda is ready. Let’s run sample applications on Kubernetes. They are sending events to Redpanda and receiving them from Redpanda. Our applications are written in Kotlin and built on top of Spring Boot and use Spring Cloud Stream to integrate with the Kafka-compatible API. They are using the Avro format for serializing and deserializing messages. Thanks to the Spring Cloud Schema Registry client, they may also integrate with the schema registry provided by Redpanda.

Here’s the list of required modules for both producer and consumer apps:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-schema-registry-client</artifactId>
</dependency>

If you would like to read more about Spring Cloud support for Kafka and schema registry you can refer to this article on my blog. It describes how to build event-driven architectures with Spring Cloud Stream Kafka and use Avro format in communication between apps.

Our producer app continuously generates and sends messages to the Redpanda topic. It is integrated with the schema registry available under the address provided in the spring.cloud.schemaRegistryClient.endpoint property. To enable that integration, we need to annotate the main class with the @EnableSchemaRegistryClient.

@SpringBootApplication
@EnableSchemaRegistryClient
class ProductionApplication {

   var id: Int = 0

   @Value("\${callme.supplier.enabled}")
   val supplierEnabled: Boolean = false

   @Bean
   fun callmeEventSupplier(): Supplier<Message<CallmeEvent>?> = Supplier { createEvent() }

   @Primary
   @Bean
   fun schemaRegistryClient(@Value("\${spring.cloud.schemaRegistryClient.endpoint}") endpoint: String?): SchemaRegistryClient {
      val client = ConfluentSchemaRegistryClient()
      client.setEndpoint(endpoint)
      return client
   }

   private fun createEvent(): Message<CallmeEvent>? {
      return if (supplierEnabled)
          MessageBuilder.withPayload(CallmeEvent(++id, "I'm callme event!", "ping"))
               .setHeader("to_process", true)
               .build()
      else
         null
   }
}

Our app does not contain a lot of code, however, we need to provide some configuration settings. In order to enable serialization with Avro, we need to set a default content type to application/*+avro (1). The target topic on Redpanda is callme-events (2). It consists of 2 partitions (3). We also need to set the name of bean responsible for generating messages (4). With the property spring.cloud.schema.avro.dynamicSchemaGenerationEnabled we may enable automatic generation of the Avro schema based on the source code (5). Of course, we also need provide the Redpanda broker address (6) and schema registry address (7).

spring.application.name=producer-service
spring.cloud.stream.default.contentType=application/*+avro # (1)
spring.cloud.stream.bindings.callmeEventSupplier-out-0.contentType=application/*+avro
spring.cloud.stream.bindings.callmeEventSupplier-out-0.destination=callme-events # (2)
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.callmeEventSupplier-out-0.producer.partitionCount=2 # (3)
spring.cloud.stream.source=callmeEventSupplier # (4)

spring.cloud.schema.avro.dynamicSchemaGenerationEnabled=true # (5)
spring.cloud.schemaRegistryClient.endpoint=http://one-node-cluster-cluster:8081/ # (6)
spring.kafka.bootstrap-servers=one-node-cluster:9092 # (7)
spring.main.allow-bean-definition-overriding=true

callme.supplier.enabled=true

Finally, let’s build and deploy our producer app on Kubernetes. We may use Skaffold for that. The app source code is configured to support it. Here’s our Deployment manifest:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
      - name: producer
        image: piomin/producer-service
        ports:
        - containerPort: 8080

Let’s verify a list of running pods in the redpanda namespace:

$ kubectl get pod -n redpanda
NAME                               READY   STATUS    RESTARTS   AGE
one-node-cluster-0                 1/1     Running   0          112m
producer-5b7f5cfcc6-586z2          1/1     Running   0          65m
redpanda-console-dcf446dc8-fzc2t   1/1     Running   0          104m

Monitor Redpanda on Kubernetes with Console and Prometheus

Our producer app is running on Kubernetes. It generates and sends messages. Let’s switch to the Redpanda Console. Here’s the view with a list of topics. As you see, the topic callme-events has been created:

redpanda-kubernetes-console

If you click on the topic, you will see the details and a list of messages:

Also, let’s verify the message schema available under the Schema Registry menu. You can compare it with the CallmeEvent object in the source code.

redpanda-kubernetes-schema

Then, let’s run our consumer app. It also integrates with the schema registry and receives messages form callme-events topic.

Thanks to Prometheus and Grafana, we can monitor several parameters related to the Redpanda broker. Here’s the screen from the Grafana dashboard:

Final Thoughts

Redpanda simplifies deployment on Kubernetes in comparison to the standard Kafka. Within the single pod, we have a broker, a schema registry, and an HTTP proxy. We can also easily install a UI console to manage Redpanda graphically. We can easily customize the Redpanda cluster using the CRD object provided by the operator.

The post Running Redpanda on Kubernetes appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/09/06/running-redpanda-on-kubernetes/feed/ 2 13109
Introduction to ksqlDB on Kubernetes with Spring Boot https://piotrminkowski.com/2022/06/22/introduction-to-ksqldb-on-kubernetes-with-spring-boot/ https://piotrminkowski.com/2022/06/22/introduction-to-ksqldb-on-kubernetes-with-spring-boot/#comments Wed, 22 Jun 2022 14:01:47 +0000 https://piotrminkowski.com/?p=11924 In this article, you will learn how to run ksqlDB on Kubernetes and use it with Spring Boot. You will also see how to run Kafka on Kubernetes based on the Strimzi operator. In order to integrate Spring Boot with the ksqlDB server, we are going to leverage a lightweight Java client provided by ksqlDB. […]

The post Introduction to ksqlDB on Kubernetes with Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to run ksqlDB on Kubernetes and use it with Spring Boot. You will also see how to run Kafka on Kubernetes based on the Strimzi operator. In order to integrate Spring Boot with the ksqlDB server, we are going to leverage a lightweight Java client provided by ksqlDB. This client supports pull and push queries. It also provides an API for inserting rows and creating tables or streams. You can read more about it in the ksqlDB documentation here.

Our sample Spring Boot application is very simple. We will use Spring Cloud Stream Supplier bean for generating and sending events to the Kafka topic. For more information about Kafka with Spring Cloud Stream please refer to the following article. On the other hand, our application gets data from the Kafka topic using kSQL queries. It also creates KTable on startup.

Let’s take a look at our architecture.

ksqldb-kubernetes-arch

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then go to the transactions-service directory. After that, you should just follow my instructions. Let’s begin.

Prerequisites

We will use several tools. You need to have:

  1. Kubernetes cluster – it may be a single-node, local cluster like Minikube or Kind. Personally, I’m using Kubernetes on the Docker Desktop
  2. kubectl CLI – to interact with the cluster
  3. Helm – we will use it to install the ksqlDB server on Kubernetes. If you don’t have Helm, you will have to install it

Run Kafka on Kubernetes with Strimzi

Of course, we need an instance of Kafka to perform our exercise. There are several ways to run Kafka on Kubernetes. I’ll show you how to do it with the operator-based approach. In the first step, you need to install OLM (Operator Lifecycle Manager) on your cluster. In order to do that, you can just execute the following command on your Kubernetes context:

$ curl -L https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.21.2/install.sh -o install.sh
$ chmod +x install.sh
$ ./install.sh v0.21.2

Then, you can proceed to the Strimzi operator installation. That’s just a single command.

$ kubectl create -f https://operatorhub.io/install/stable/strimzi-kafka-operator.yaml

Now, we can create a Kafka cluster on Kubernetes. Let’s begin with a dedicated namespace for our exercise:

$ kubectl create ns kafka

I assume you have a single-node Kubernetes cluster, so we also create a single-node Kafka. Here’s the YAML manifest with Kafka CRD. You can find it in the repository under the path k8s/cluster.yaml.

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  entityOperator:
    topicOperator: {}
    userOperator: {}
  kafka:
    config:
      default.replication.factor: 1
      inter.broker.protocol.version: "3.2"
      min.insync.replicas: 1
      offsets.topic.replication.factor: 1
      transaction.state.log.min.isr: 1
      transaction.state.log.replication.factor: 1
    listeners:
      - name: plain
        port: 9092
        tls: false
        type: internal
      - name: tls
        port: 9093
        tls: true
        type: internal
    replicas: 1
    storage:
      type: jbod
      volumes:
        - id: 0
          type: persistent-claim
          size: 30Gi
          deleteClaim: true
    version: 3.2.0
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: true

Let’s apply it to Kubernetes in the kafka namespace:

$ kubectl apply -f k8s/cluster.yaml -n kafka

You should see a single instance of Kafka and also a single instance of Zookeeper. If the pods are running, it means you have Kafka on Kubernetes.

$ kubectl get pod -n kafka
NAME                                          READY   STATUS    RESTARTS  AGE
my-cluster-entity-operator-68cc6bc4d9-qs88p   3/3     Running   0         46m
my-cluster-kafka-0                            1/1     Running   0         48m
my-cluster-zookeeper-0                        1/1     Running   0         48m

Kafka is available inside the cluster under the name my-cluster-kafka-bootstrap and port 9092.

kubectl get svc -n kafka
NAME                          TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                               AGE
my-cluster-kafka-bootstrap    ClusterIP   10.108.109.255   <none>        9091/TCP,9092/TCP,9093/TCP            47m
my-cluster-kafka-brokers      ClusterIP   None             <none>        9090/TCP,9091/TCP,9092/TCP,9093/TCP   47m
my-cluster-zookeeper-client   ClusterIP   10.102.10.251    <none>        2181/TCP                              47m
my-cluster-zookeeper-nodes    ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP            47m

Run KsqlDB Server on Kubernetes

The KsqlDB Server is a part of the Confluent Platform. Since we are not installing the whole Confluent Platform on Kubernetes, but just an open-source Kafka cluster, we need to install KsqlDB Server separately. Let’s do it with Helm. There is no “official” Helm chart for the KSQL server. Therefore, we should go directly to the Confluent Helm repository on GitHub:

$ git clone https://github.com/confluentinc/cp-helm-charts.git
$ cd cp-helm-charts

In this repository, you can find separate Helm charts for every single Confluent component including e.g. control center or KSQL Server. The location of our chart inside the repository is charts/cp-ksql-server. We need to override some default settings during installation. First of all, we have to disable the headless mode. In the headless mode, KSQL Server does not expose the HTTP endpoint and loads queries from the input script. Our Spring Boot app will connect to the server through HTTP. In the next step, we should override the default address of the Kafka cluster and the default version of the KSQL Server which is still 6.1.0 there. We will use the latest version 7.1.1. Here’s the helm command you should run on your Kubernetes cluster:

$ helm install cp-ksql-server \
    --set ksql.headless=false \
    --set kafka.bootstrapServers=my-cluster-kafka-bootstrap:9092 \
    --set imageTag=7.1.1 \
  charts/cp-ksql-server -n kafka

Here’s the result:

Let’s verify if KSQL is running on the cluster:

$ kubectl get pod -n kafka | grep ksql
cp-ksql-server-679fc98889-hldfv               2/2     Running   0               2m11s

The HTTP endpoint is available for other applications under the name cp-ksql-server and port 8088:

$ kubectl get svc -n kafka | grep ksql
cp-ksql-server                ClusterIP   10.109.189.36    <none>        8088/TCP,5556/TCP                     3m25s

Now, we have the whole required staff running on our Kubernetes cluster. Therefore, we can proceed to the Spring Boot app implementation.

Integrate Spring Boot with ksqlDB

I didn’t find any out-of-the-box integration between Spring Boot and ksqlDB. Therefore, we will use the ksqldb-api-client directly. In the first, we need to include the ksqlDB Maven repository and some dependencies:

<dependencies>
        ...

  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-api-client</artifactId>
    <version>0.26.0</version>
  </dependency>
  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-udf</artifactId>
    <version>0.26.0</version>
  </dependency>
  <dependency>
    <groupId>io.confluent.ksql</groupId>
    <artifactId>ksqldb-common</artifactId>
    <version>0.26.0</version>
  </dependency>
</dependencies>

<repositories>
  <repository>
    <id>ksqlDB</id>
    <name>ksqlDB</name>
    <url>https://ksqldb-maven.s3.amazonaws.com/maven/</url>
  </repository>
</repositories>

After that, we can define a Spring @Bean returning the ksqlDB Client implementation. Since we will run our application in the same namespace as the KSQL Server, we need to provide the Kubernetes Service name as the host name.

@Configuration
public class KSQLClientProducer {

    @Bean
    Client ksqlClient() {
        ClientOptions options = ClientOptions.create()
                .setHost("cp-ksql-server")
                .setPort(8088);
        return Client.create(options);
    }
}

Our application is interacting with KSQL Server through an HTTP endpoint. It creates a single KTable on startup. To do that, we need to invoke the executeStatement method on the instance of the KSQL Client bean. We are creating the SOURCE table to enable running pull queries on it. The table gets data from the transactions topic. It expects JSON format in the incoming events.

public class KTableCreateListener implements ApplicationListener<ContextRefreshedEvent> {

   private static final Logger LOG = LoggerFactory.getLogger(KTableCreateListener.class);
   private Client ksqlClient;

   public KTableCreateListener(Client ksqlClient) {
      this.ksqlClient = ksqlClient;
   }

   @Override
   public void onApplicationEvent(ContextRefreshedEvent event) {
      try {
         String sql = """
                 CREATE SOURCE TABLE IF NOT EXISTS transactions_view (
                   id BIGINT PRIMARY KEY,
                   sourceAccountId BIGINT,
                   targetAccountId BIGINT,
                   amount INT
                 ) WITH (
                   kafka_topic='transactions',
                   value_format='JSON'
                 );
                 """;
         ExecuteStatementResult result = ksqlClient.executeStatement(sql).get();
         LOG.info("Result: {}", result.queryId().orElse(null));
      } catch (ExecutionException | InterruptedException e) {
         LOG.error("Error: ", e);
      }
   }
}

After creating the table we can run some queries on it. There are pretty simple queries. We are trying to find all transactions and all transactions related to the particular account.

@RestController
@RequestMapping("/transactions")
public class TransactionResource {

   private static final Logger LOG = LoggerFactory.getLogger(TransactionResource.class);
   Client ksqlClient;

   public TransactionResource(Client ksqlClient) {
      this.ksqlClient = ksqlClient;
   }

   @GetMapping
   public List<Transaction> getTransactions() throws ExecutionException, InterruptedException {
      StreamedQueryResult sqr = ksqlClient
            .streamQuery("SELECT * FROM transactions_view;")
            .get();
      Row row;
      List<Transaction> l = new ArrayList<>();
      while ((row = sqr.poll()) != null) {
         l.add(mapRowToTransaction(row));
      }
      return l;
   }

   @GetMapping("/target/{accountId}")
   public List<Transaction> getTransactionsByTargetAccountId(@PathVariable("accountId") Long accountId)
            throws ExecutionException, InterruptedException {
      StreamedQueryResult sqr = ksqlClient
            .streamQuery("SELECT * FROM transactions_view WHERE sourceAccountId=" + accountId + ";")
            .get();
      Row row;
      List<Transaction> l = new ArrayList<>();
      while ((row = sqr.poll()) != null) {
         l.add(mapRowToTransaction(row));
      }
      return l;
   }

   private Transaction mapRowToTransaction(Row row) {
      Transaction t = new Transaction();
      t.setId(row.getLong("ID"));
      t.setSourceAccountId(row.getLong("SOURCEACCOUNTID"));
      t.setTargetAccountId(row.getLong("TARGETACCOUNTID"));
      t.setAmount(row.getInteger("AMOUNT"));
      return t;
   }

}

Sending events to the topic with Spring Cloud Stream

Finally, we can proceed to the last part of our exercise. We need to generate test data and send it to the Kafka transactions topic. The simplest way to achieve it is with the Spring Cloud Stream Kafka module. Firstly, let’s add the following Maven dependency:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

Then, we may create a producer based on the Spring Supplier bean. The Supplier bean continuously generates and sends new events to the target channel. By default, it repeats the action once per second.

@Configuration
public class KafkaEventProducer {

   private static long transactionId = 0;
   private static final Random r = new Random();

   @Bean
   public Supplier<Message<Transaction>> transactionsSupplier() {
      return () -> {
          Transaction t = new Transaction();
          t.setId(++transactionId);
          t.setSourceAccountId(r.nextLong(1, 100));
          t.setTargetAccountId(r.nextLong(1, 100));
          t.setAmount(r.nextInt(1, 10000));
          Message<Transaction> o = MessageBuilder
                .withPayload(t)
                .setHeader(KafkaHeaders.MESSAGE_KEY, new TransactionKey(t.getId()))
                .build();
          return o;
      };
   }
}

Of course, we also need to provide the address of our Kafka cluster and the name of a target topic for the channel. The address of Kafka is injected at the deployment phase.

spring.kafka.bootstrap-servers = ${KAFKA_URL}
spring.cloud.stream.bindings.transactionsSupplier-out-0.destination = transactions

Finally, let’s deploy our Spring Boot on Kubernetes. Here’s the YAML manifest containing Kubernetes Deployment and Service definitions:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: transactions
spec:
  selector:
    matchLabels:
      app: transactions
  template:
    metadata:
      labels:
        app: transactions
    spec:
      containers:
      - name: transactions
        image: piomin/transactions-service
        env:
          - name: KAFKA_URL
            value: my-cluster-kafka-bootstrap:9092
        ports:
        - containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
  name: transactions
spec:
  type: ClusterIP
  selector:
    app: transactions
  ports:
    - port: 8080

Let’s deploy the app in the kafka namespace:

$ kubectl apply -f k8s/deployment.yaml -n kafka

Testing ksqlDB on Kubernetes

Once the app is deployed on Kubernetes, let’s enable port-forward to test it on the local port:

$ kubectl port-forward service/transactions 8080:8080

Now, we can test our two HTTP endpoints. Let’s start with the endpoint for searching all transactions:

$ curl http://localhost:8080/transactions

Then, you can call the endpoint for searching all transactions related to the targetAccountId, e.g.:

$ curl http://localhost:8080/transactions/target/10

Final Thoughts

In this article, I wanted to show how you can start with ksqlDB on Kubernetes. We used such frameworks as Spring Boot and Spring Cloud Stream to interact with Kafka and ksqlDB. You could see how to run the Kafka cluster on Kubernetes using the Strimzi operator or how to deploy KSQL Server directly from the Helm repository.

The post Introduction to ksqlDB on Kubernetes with Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/06/22/introduction-to-ksqldb-on-kubernetes-with-spring-boot/feed/ 12 11924
Autoscaling on Kubernetes with KEDA and Kafka https://piotrminkowski.com/2022/01/18/autoscaling-on-kubernetes-with-keda-and-kafka/ https://piotrminkowski.com/2022/01/18/autoscaling-on-kubernetes-with-keda-and-kafka/#comments Tue, 18 Jan 2022 14:58:54 +0000 https://piotrminkowski.com/?p=10475 In this article, you will learn how to autoscale your application that consumes messages from the Kafka topic with KEDA. The full name that stands behind that shortcut is Kubernetes Event Driven Autoscaling. In order to explain the idea behind it, I will create two simple services. The first of them is sending events to […]

The post Autoscaling on Kubernetes with KEDA and Kafka appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to autoscale your application that consumes messages from the Kafka topic with KEDA. The full name that stands behind that shortcut is Kubernetes Event Driven Autoscaling. In order to explain the idea behind it, I will create two simple services. The first of them is sending events to the Kafka topic, while the second is receiving them. We will run both these applications on Kubernetes. To simplify the exercise, we may use Spring Cloud Stream, which offers a smart integration with Kafka.

Architecture

Before we start, let’s take a moment to understand our scenario for today. We have a single Kafka topic used by both our applications to exchange events. This topic consists of 10 partitions. There is also a single instance of the producer that sends events at regular intervals. We are going to scale down and scale up the number of pods for the consumer service. All the instances of the consumer service are assigned to the same Kafka consumer group. It means that only a single instance with the group may receive the particular event.

Each consumer instance has only a single receiving thread. Therefore, we can easily simulate an event processing time. We will sleep the main thread for 1 second. On the other hand, the producer will send events with a variable speed. Also, it will split the messages across all available partitions. Such behavior may result in consumer lag on partitions because Spring Cloud Stream commits offset only after handling a message. In our case, the value of lag depends on producer speed and the number of running consumer instances. To clarify let’s take a look at the diagram below.

keda-kafka-arch1

Our goal is very simple. We need to adjust the number of consumer instances to the traffic rate generated by the producer service. The value of offset lag can’t exceed the desired threshold. If we increase the traffic rate on the producer side KEDA should scale up the number of consumer instances. Consequently, if we decrease the producer traffic rate it should scale down the number of consumer instances. Here’s the diagram with our scenario.

keda-kafka-arch2

Use Kafka with Spring Cloud Stream

In order to use Spring Cloud Stream for Kafka, we just need to include a single dependency in Maven pom.xml:

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

After that, we can use a standard Spring Cloud Stream model. However, in the background, it integrates with Kafka through a particular binder implementation. I will not explain the details, but if you are interested please read the following article. It explains the basics at the example of RabbitMQ.

Both our applications are very simple. The producer just generates and sends events (by default in JSON format). The only thing we need to do in the code is to declare the Supplier bean. In the background, there is a single thread that generates and sends CallmeEvent every second. Each time it only increases the id field inside the message:

@SpringBootApplication
public class ProducerApp {

   private static int id = 0;

   public static void main(String[] args) {
      SpringApplication.run(ProducerApp.class, args);
   }

   @Bean
   public Supplier<CallmeEvent> eventSupplier() {
      return () -> new CallmeEvent(++id, "Hello" + id, "PING");
   }

}

We can change a default fixed delay between the Supplier ticks with the following property. Let’s say we want to send an event every 100 ms:

spring.cloud.stream.poller.fixedDelay = 100

We should also provide basic configuration like the Kafka address, topic name (if different than the name of the Supplier function), number of partitions, and a partition key. Spring Cloud Stream automatically creates topics on application startup.

spring.cloud.stream.bindings.eventSupplier-out-0.destination = test-topic
spring.cloud.stream.bindings.eventSupplier-out-0.producer.partitionKeyExpression = payload.id
spring.cloud.stream.bindings.eventSupplier-out-0.producer.partitionCount = 10
spring.kafka.bootstrap-servers = one-node-cluster.redpanda:9092

Now, the consumer application. It is also not very complicated. As I mentioned before, we will sleep the main thread inside the receiving method in order to simulate processing time.

public class ConsumerApp {

   private static final Logger LOG = LoggerFactory.getLogger(ConsumerAApp.class);

   public static void main(String[] args) {
      SpringApplication.run(ConsumerApp.class, args);
   }

   @Bean
   public Consumer<Message<CallmeEvent>> eventConsumer() {
      return event -> {
         LOG.info("Received: {}", event.getPayload());
         try {
            Thread.sleep(1000);
         } catch (InterruptedException e) { }
      };
   }

}

Finally, the configuration on the consumer side. It is important to set the consumer group and enable partitioning.

spring.cloud.stream.bindings.eventConsumer-in-0.destination = test-topic
spring.cloud.stream.bindings.eventConsumer-in-0.group = a
spring.cloud.stream.bindings.eventConsumer-in-0.consumer.partitioned = true
spring.kafka.bootstrap-servers = one-node-cluster.redpanda:9092

Now, we should deploy both applications on Kubernetes. But before we do that, let’s install Kafka and KEDA on Kubernetes.

Install Kafka on Kubernetes

To perform this part you need to install helm. Instead of Kafka directly, we can install Redpanda. It is a Kafka API compatible platform. However, the Redpanda operator requires cert-manager to create certificates for TLS communication. So, let’s install it first. We use the latest version of the cert-manager. It requires adding CRDs:

$ kubectl apply -f https://github.com/jetstack/cert-manager/releases/download/v1.6.1/cert-manager.crds.yaml

Then you need to add a new Helm repository:

$ helm repo add jetstack https://charts.jetstack.io

And finally, install it cert-manager:

$ helm install cert-manager \
   --namespace cert-manager \
   --version v1.6.1 \
   jetstack/cert-manager

Now, we can proceed to the Redpanda installation. The same as before, let’s add the Helm repository:

$ helm repo add redpanda https://charts.vectorized.io/
$ helm repo update

We can obtain the latest version of Redpanda:

$ export VERSION=$(curl -s https://api.github.com/repos/vectorizedio/redpanda/releases/latest | jq -r .tag_name)

Then, let’s add the CRDs:

$ kubectl apply -f https://github.com/vectorizedio/redpanda/src/go/k8s/config/crd

After that, we can finally install the Redpanda operator:

$ helm install \
   redpanda-operator \
   redpanda/redpanda-operator \
   --namespace redpanda-system \
   --create-namespace \
   --version $VERSION

We will install a single-node cluster in the redpanda namespace. To do that we need to apply the following manifest:

apiVersion: redpanda.vectorized.io/v1alpha1
kind: Cluster
metadata:
  name: one-node-cluster
spec:
  image: "vectorized/redpanda"
  version: "latest"
  replicas: 1
  resources:
    requests:
      cpu: 1
      memory: 1.2Gi
    limits:
      cpu: 1
      memory: 1.2Gi
  configuration:
    rpcServer:
      port: 33145
    kafkaApi:
    - port: 9092
    pandaproxyApi:
    - port: 8082
    adminApi:
    - port: 9644
    developerMode: true

Once you did that, you can verify a list of pods in the redpanda namespace:

$ kubectl get pod -n redpanda                      
NAME                 READY   STATUS    RESTARTS   AGE
one-node-cluster-0   1/1     Running   0          4s

If you noticed, I have already set a Kafka bootstrap server address in the application.properties. For me, it is one-node-cluster.redpanda:9092. You can verify it using the following command:

$ kubectl get svc -n redpanda
NAME               TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)                      AGE
one-node-cluster   ClusterIP   None         <none>        9644/TCP,9092/TCP,8082/TCP   23h

Install KEDA and integrate it with Kafka

The same as before we will install KEDA on Kubernetes with Helm. Let’s add the following Helm repo:

$ helm repo add kedacore https://kedacore.github.io/charts

Don’t forget to update the repository. We will install the operator in the keda namespace. Let’s create the namespace first:

$ kubectl create namespace keda

Finally, we can install the operator:

$ helm install keda kedacore/keda --namespace keda

I will run both example applications in the default namespace, so I will create a KEDA object also in this namespace. The main object responsible for configuring autoscaling with KEDA is ScaledObject. Here’s the definition:

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: consumer-scaled
spec:
  scaleTargetRef:
    name: consumer-deployment # (1)
  cooldownPeriod: 30 # (2)
  maxReplicaCount:  10 # (3)
  advanced:
    horizontalPodAutoscalerConfig: # (4)
      behavior:
        scaleDown:
          stabilizationWindowSeconds: 30
          policies:
            - type: Percent
              value: 50
              periodSeconds: 30
  triggers: # (5)
    - type: kafka
      metadata:
        bootstrapServers: one-node-cluster.redpanda:9092
        consumerGroup: a
        topic: test-topic
        lagThreshold: '5'

Let’s analyze the configuration in the details:

(1) We are setting autoscaler for the consumer application, which is deployed under the consumer-deployment name (see the next section for the Deployment manifest)

(2) We decrease the default value of the cooldownPeriod parameter from 300 seconds to 30 in order to test the scale-to-zero mechanism

(3) The maximum number of running pods is 10 (the same as the number of partitions in the topic) instead of the default 100

(4) We can customize the behavior of the Kubernetes HPA. Let’s do that for the scale-down operation. We could as well configure that for the scale-up operation. We allow to scale down 50% of current running replicas.

(5) The last and the most important part – a trigger configuration. We should set the address of the Kafka cluster, the name of the topic, and the consumer group used by our application. The lag threshold is 10. It sets the average target value of offset lags to trigger scaling operations.

Before applying the manifest containing ScaledObject we need to deploy the consumer application. Let’s proceed to the next section.

Test Autoscaling with KEDA and Kafka

Let’s deploy the consumer application first. It is prepared to be deployed with Skaffold, so you can just run the command skaffold dev from the consumer directory. Anyway, here’s the Deployment manifest:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: consumer-deployment
spec:
  selector:
    matchLabels:
      app: consumer
  template:
    metadata:
      labels:
        app: consumer
    spec:
      containers:
      - name: consumer
        image: piomin/consumer
        ports:
        - containerPort: 8080

Once we created it, we can also apply the KEDA ScaledObject. After creating let’s display its status with the kubectl get so command.

keda-kafka-scaledobject

Ok, but… it is inactive. If you think about that’s logical since there are no incoming events in Kafka’s topic. Right? So, KEDA has performed a scale-to-zero operation as shown below:

Now, let’s deploy the producer application. For now, DO NOT override the default value of the spring.cloud.stream.poller.maxMessagesPerPoll parameter. The producer will send one event per second.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer
spec:
  selector:
    matchLabels:
      app: producer
  template:
    metadata:
      labels:
        app: producer
    spec:
      containers:
        - name: producer
          image: piomin/producer
          ports:
            - containerPort: 8080

After some time you can run the kubectl get so once again. Now the status in the ACTIVE column should be true. And there is a single instance of the consumer application (1 event per second sent by a producer and received by a consumer).

We can also verify offsets and lags for consumer groups on the topic partitions. Just run the command rpk group describe a inside the Redpanda container.

Now, we will change the traffic rate on the producer side. It will send 5 events per second instead of 1 event before. To do that we have to define the following property in the application.properties file.

spring.cloud.stream.poller.fixedDelay = 200

Before KEDA performs autoscaling we still have a single instance of the consumer application. Therefore, after some time the lag on partitions will exceed the desired threshold as shown below.

Once autoscaling occurs we can display a list of deployments. As you see, now there are 5 running pods of the consumer service.

keda-kafka-scaling

Once again let’s verify the status of our Kafka consumer group. There 5 consumers on the topic partitions.

Finally, just to check it out – let’s undeploy the producer application. What happened? The consumer-deployment has been scaled down to zero.

Final Thoughts

You can use KEDA not only with Kafka. There are a lot of other options available including databases, different message brokers, or even cron. Here’s a full list of the supported tools. In this article, I showed how to use Kafka consumer offset and lag as a criterium for autoscaling with KEDA. I tried to explain this process in the detail. Hope it helps you to understand how KEDA exactly works 🙂

The post Autoscaling on Kubernetes with KEDA and Kafka appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/01/18/autoscaling-on-kubernetes-with-keda-and-kafka/feed/ 20 10475