distributed tracing Archives - Piotr's TechBlog https://piotrminkowski.com/tag/distributed-tracing/ Java, Spring, Kotlin, microservices, Kubernetes, containers Mon, 31 Jan 2022 11:06:43 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 distributed tracing Archives - Piotr's TechBlog https://piotrminkowski.com/tag/distributed-tracing/ 32 32 181738725 Distributed Tracing with Istio, Quarkus and Jaeger https://piotrminkowski.com/2022/01/31/distributed-tracing-with-istio-quarkus-and-jaeger/ https://piotrminkowski.com/2022/01/31/distributed-tracing-with-istio-quarkus-and-jaeger/#respond Mon, 31 Jan 2022 10:26:05 +0000 https://piotrminkowski.com/?p=10540 In this article, you will learn how to configure distributed tracing for your service mesh with Istio and Quarkus. For test purposes, we will build and run Quarkus microservices on Kubernetes. The communication between them is going to be managed by Istio. Istio service mesh uses Jaeger as a distributed tracing system. This time I […]

The post Distributed Tracing with Istio, Quarkus and Jaeger appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to configure distributed tracing for your service mesh with Istio and Quarkus. For test purposes, we will build and run Quarkus microservices on Kubernetes. The communication between them is going to be managed by Istio. Istio service mesh uses Jaeger as a distributed tracing system.

This time I won’t tell you about Istio basics. Although our configuration is not complicated you may read the following introduction before we start.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should go to the mesh-with-db directory. After that, you should just follow my instructions 🙂

Service Mesh Architecture

Let’s start with our microservices architecture. There are two applications: person-app and insurance-app. As you probably guessed, the person-app stores and returns information about insured people. On the other hand, the insurance-app keeps insurances data. Each service has a separate database. We deploy the person-app in two versions. The v2 version contains one additional field externalId.

The following picture illustrates our scenario. Istio splits traffic between two versions of the person-app. By default, it splits the traffic 50% to 50%. If it receives the X-Version header in the request it calls the particular version of the person-app. Of course, the possible values of the header are v1 or v2.

quarkus-istio-tracing-arch

Distributed Tracing with Istio

Istio generates distributed trace spans for each managed service. It means that every request sent inside the Istio will have the following HTTP headers:

So, every single request incoming from the Istio gateway contains X-B3-SpanId, X-B3-TraceId, and some other B3 headers. The X-B3-SpanId indicates the position of the current operation in the trace tree. On the other hand, every span in a trace should share the X-B3-TraceId header. At first glance, you can feel surprised that Istio does not propagate B3 headers in client calls. To clarify, if one service communicates with another service using e.g. REST client, you will see two different traces. The first of them is related to the API endpoint call, while the second with the client call of another API endpoint. That’s not exactly what we would like to achieve, right?

Let’s visualize our problem. If you call the insurance-app through the Istio gateway you will have the first trace in Jaeger. During that call the insurance-app calls endpoint from the person-app using Quarkus REST client. That’s another separate trace in Jeager. Our goal here is to propagate all required B3 headers to the person-app also. You can find a list of required headers here in the Istio documentation.

quarkus-istio-tracing-details

Of course, that’s not the only thing we will do today. We will also prepare Istio rules in order to simulate latency in our communication. It is a good scenario to use the tracing tool. Also, I’m going to show you how to easily deploy microservice on Kubernetes using Quarkus features for that.

I’m running Istio and Jaeger on OpenShift. More precisely, I’m using OpenShift Service Mesh that is the RedHat’s SM implementation based on Istio. I doesn’t have any impact on the exercise, so you as well repeat all the steps on Kubernetes.

Create Microservices with Quarkus

Let’s begin with the insurance-app. Here’s the class responsible for the REST endpoints implementation. There are several methods there. However, the most important for us is the getInsuranceDetailsById method that calls the GET /persons/{id} endpoint in the person-app. In order to use the Quarkus REST client extension, we need to inject client bean with @RestClient annotation.

@Path("/insurances")
public class InsuranceResource {

   @Inject
   Logger log;
   @Inject
   InsuranceRepository insuranceRepository;
   @Inject @RestClient
   PersonService personService;

   @POST
   @Transactional
   public Insurance addInsurance(Insurance insurance) {
      insuranceRepository.persist(insurance);
      return insurance;
   }

   @GET
   public List<Insurance> getInsurances() {
      return insuranceRepository.listAll();
   }

   @GET
   @Path("/{id}")
   public Insurance getInsuranceById(@PathParam("id") Long id) {
      return insuranceRepository.findById(id);
   }

   @GET
   @Path("/{id}/details")
   public InsuranceDetails getInsuranceDetailsById(@PathParam("id") Long id, @HeaderParam("X-Version") String version) {
      log.infof("getInsuranceDetailsById: id=%d, version=%s", id, version);
      Insurance insurance = insuranceRepository.findById(id);
      InsuranceDetails insuranceDetails = new InsuranceDetails();
      insuranceDetails.setPersonId(insurance.getPersonId());
      insuranceDetails.setAmount(insurance.getAmount());
      insuranceDetails.setType(insurance.getType());
      insuranceDetails.setExpiry(insurance.getExpiry());
      insuranceDetails.setPerson(personService.getPersonById(insurance.getPersonId()));
      return insuranceDetails;
   }

}

As you probably remember from the previous section, we need to propagate several headers responsible for Istio tracing to the downstream Quarkus service. Let’s take a look at the implementation of the REST client in the PersonService interface. In order to send some additional headers to the request, we need to annotate the interface with @RegisterClientHeaders. Then we have two options. We can provide our custom headers factory as shown below. Otherwise, we may use the property org.eclipse.microprofile.rest.client.propagateHeaders with a list of headers.

@Path("/persons")
@RegisterRestClient
@RegisterClientHeaders(RequestHeaderFactory.class)
public interface PersonService {

   @GET
   @Path("/{id}")
   Person getPersonById(@PathParam("id") Long id);
}

Let’s just take a look at the implementation of the REST endpoint in the person-app. The method getPersonId at the bottom is responsible for finding a person by the id field. It is a target method called by the PersonService client.

@Path("/persons")
public class PersonResource {

   @Inject
   Logger log;
   @Inject
   PersonRepository personRepository;

   @POST
   @Transactional
   public Person addPerson(Person person) {
      personRepository.persist(person);
      return person;
   }

   @GET
   public List<Person> getPersons() {
      return personRepository.listAll();
   }

   @GET
   @Path("/{id}")
   public Person getPersonById(@PathParam("id") Long id) {
      log.infof("getPersonById: id=%d", id);
      Person p = personRepository.findById(id);
      log.infof("getPersonById: %s", p);
      return p;
   }
}

Finally, the implementation of our customer client headers factory. It needs to implement the ClientHeadersFactory interface and its update() method. We are not doing anything complicated here. We are forwarding B3 tracing headers and the X-Version used by Istio to route between two versions of the person-app. I also added some logs. There I didn’t use the already mentioned option of header propagation based on the property org.eclipse.microprofile.rest.client.propagateHeaders.

@ApplicationScoped
public class RequestHeaderFactory implements ClientHeadersFactory {

   @Inject
   Logger log;

   @Override
   public MultivaluedMap<String, String> update(MultivaluedMap<String, String> inHeaders,
                                                 MultivaluedMap<String, String> outHeaders) {
      String version = inHeaders.getFirst("x-version");
      log.infof("Version Header: %s", version);
      String traceId = inHeaders.getFirst("x-b3-traceid");
      log.infof("Trace Header: %s", traceId);
      MultivaluedMap<String, String> result = new MultivaluedHashMap<>();
      result.add("X-Version", version);
      result.add("X-B3-TraceId", traceId);
      result.add("X-B3-SpanId", inHeaders.getFirst("x-b3-spanid"));
      result.add("X-B3-ParentSpanId", inHeaders.getFirst("x-b3-parentspanid"));
      return result;
   }
}

Run Quarkus Applications on Kubernetes

Before we test Istio tracing we need to deploy our Quarkus microservices on Kubernetes. We may do it in several different ways. One of the methods is provided directly by Quarkus. Thanks to that we can generate the Deployment manifests automatically during the build. It turns out that we can also apply Istio manifests to the Kubernetes cluster as well. Firstly, we need to include the following two modules.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-openshift</artifactId>
</dependency>
<dependency>
  <groupId>me.snowdrop</groupId>
  <artifactId>istio-client</artifactId>
  <version>1.7.7.1</version>
</dependency>

If you deploy on Kubernetes just replace the quarkus-openshift module with the quarkus-kubernetes module. In the next step, we need to provide some configuration settings in the application.properties file. In order to enable deployment during the build, we need to set the property quarkus.kubernetes.deploy to true. We can configure several aspects of the Kubernetes Deployment. For example, we may enable the Istio proxy by setting the annotation sidecar.istio.io/inject to true or add some labels required for routing: app and version (in that case). Finally, our application connects to the database, so we need to inject values from the Kubernetes Secret.

quarkus.container-image.group = demo-mesh
quarkus.container-image.build = true
quarkus.kubernetes.deploy = true
quarkus.kubernetes.deployment-target = openshift
quarkus.kubernetes-client.trust-certs = true

quarkus.openshift.deployment-kind = Deployment
quarkus.openshift.labels.app = quarkus-insurance-app
quarkus.openshift.labels.version = v1
quarkus.openshift.annotations."sidecar.istio.io/inject" = true
quarkus.openshift.env.mapping.postgres_user.from-secret = insurance-db
quarkus.openshift.env.mapping.postgres_user.with-key = database-user
quarkus.openshift.env.mapping.postgres_password.from-secret = insurance-db
quarkus.openshift.env.mapping.postgres_password.with-key = database-password
quarkus.openshift.env.mapping.postgres_db.from-secret = insurance-db
quarkus.openshift.env.mapping.postgres_db.with-key = database-name

Here’s a part of the configuration responsible for database connection settings. The name of the key after quarkus.openshift.env.mappings maps to the name of the environment variable, for example, the postgres_password property to the POSTGRES_PASSWORD env.

quarkus.datasource.db-kind = postgresql
quarkus.datasource.username = ${POSTGRES_USER}
quarkus.datasource.password = ${POSTGRES_PASSWORD}
quarkus.datasource.jdbc.url = 
jdbc:postgresql://person-db:5432/${POSTGRES_DB}

If there are any additional manifests to apply we should place them inside the src/main/kubernetes directory. This applies to, for example, the Istio configuration. So, now the only thing we need to do is to application build. Firstly go to the quarkus-person-app directory and run the following command. Then, go to the quarkus-insurance-app directory, and do the same.

$ mvn clean package

Traffic Management with Istio

There are two versions of the person-app application. So, let’s create the DestinationRule object containing two subsets v1 and v2 based on the version label.

apiVersion: networking.istio.io/v1beta1
kind: DestinationRule
metadata:
  name: quarkus-person-app-dr
spec:
  host: quarkus-person-app
  subsets:
    - name: v1
      labels:
        version: v1
    - name: v2
      labels:
        version: v2

In the next step, we need to create the VirtualService object for the quarkus-person-app service. The routing between versions can be based on the X-Version header. If that is not set Istio sends 50% to the v1 version, and 50% to the v2 version. Also, we will inject the delay into the v2 route using the Istio HTTPFaultInjection object. It adds 3 seconds delay for 50% of incoming requests.

apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: quarkus-person-app-vs
spec:
  hosts:
    - quarkus-person-app
  http:
    - match:
        - headers:
            X-Version:
              exact: v1
      route:
        - destination:
            host: quarkus-person-app
            subset: v1
    - match:
        - headers:
            X-Version:
              exact: v2
      route:
        - destination:
            host: quarkus-person-app
            subset: v2
      fault:
        delay:
          fixedDelay: 3s
          percentage:
            value: 50
    - route:
        - destination:
            host: quarkus-person-app
            subset: v1
          weight: 50
        - destination:
            host: quarkus-person-app
            subset: v2
          weight: 50

Now, let’s create the Istio Gateway object. Replace the CLUSTER_DOMAIN variable with your cluster’s domain name:

apiVersion: networking.istio.io/v1beta1
kind: Gateway
metadata:
  name: microservices-gateway
spec:
  selector:
    istio: ingressgateway
  servers:
    - port:
        number: 80
        name: http
        protocol: HTTP
      hosts:
        - quarkus-insurance-app.apps.$CLUSTER_DOMAIN
        - quarkus-person-app.apps.$CLUSTER_DOMAIN

In order to forward traffic from the gateway, the VirtualService needs to refer to that gateway.

apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
  name: quarkus-insurance-app-vs
spec:
  hosts:
    - quarkus-insurance-app.apps.$CLUSTER_DOMAIN
  gateways:
    - microservices-gateway
  http:
    - match:
        - uri:
            prefix: "/insurance"
      rewrite:
        uri: " "
      route:
        - destination:
            host: quarkus-insurance-app
          weight: 100

Now you can call the insurance-app service:

$ curl http://quarkus-insurance-app.apps.${CLUSTER_DOMAIN}/insurance/insurances/1/details

We can verify all the existing Istio objects using Kiali.

Testing Istio Tracing with Quarkus

First of all, you can generate many requests using the siege tool. There are multiple ways to run it. We can prepare the file with example requests as shown below:

http://quarkus-person-app.apps.${CLUSTER_DOMAIN}/person/persons/1
http://quarkus-person-app.apps.${CLUSTER_DOMAIN}/person/persons/2
http://quarkus-person-app.apps.${CLUSTER_DOMAIN}/person/persons/3
http://quarkus-person-app.apps.${CLUSTER_DOMAIN}/person/persons/4
http://quarkus-person-app.apps.${CLUSTER_DOMAIN}/person/persons/5
http://quarkus-person-app.apps.${CLUSTER_DOMAIN}/person/persons/6
http://quarkus-person-app.apps.${CLUSTER_DOMAIN}/person/persons/7
http://quarkus-person-app.apps.${CLUSTER_DOMAIN}/person/persons/8
http://quarkus-person-app.apps.${CLUSTER_DOMAIN}/person/persons/9
http://quarkus-person-app.apps.${CLUSTER_DOMAIN}/person/persons
http://quarkus-insurance-app.apps.${CLUSTER_DOMAIN}/insurance/insurances/1/details
http://quarkus-insurance-app.apps.${CLUSTER_DOMAIN}/insurance/insurances/2/details
http://quarkus-insurance-app.apps.${CLUSTER_DOMAIN}/insurance/insurances/3/details
http://quarkus-insurance-app.apps.${CLUSTER_DOMAIN}/insurance/insurances/4/details
http://quarkus-insurance-app.apps.${CLUSTER_DOMAIN}/insurance/insurances/5/details
http://quarkus-insurance-app.apps.${CLUSTER_DOMAIN}/insurance/insurances/6/details
http://quarkus-insurance-app.apps.${CLUSTER_DOMAIN}/insurance/insurances/1
http://quarkus-insurance-app.apps.${CLUSTER_DOMAIN}/insurance/insurances/2
http://quarkus-insurance-app.apps.${CLUSTER_DOMAIN}/insurance/insurances/3
http://quarkus-insurance-app.apps.${CLUSTER_DOMAIN}/insurance/insurances/4
http://quarkus-insurance-app.apps.${CLUSTER_DOMAIN}/insurance/insurances/5
http://quarkus-insurance-app.apps.${CLUSTER_DOMAIN}/insurance/insurances/6

Now we need to set that file as an input for the siege command. We can also set the number of repeats (-r) and concurrent threads (-c).

$ siege -f k8s/traffic/urls.txt -i -v -r 500 -c 10 --no-parser

It takes some time before the command will finish. In the meanwhile, let’s try to send a single request to the insurance-app with the X-Version header set to v2. 50% of such requests are delayed by the Istio quarkus-person-app-vs VirtualService. Repeat the request until you have the response with 3s delay:

Here’s the access log from the Quarkus application:

Then, let’s switch to the Jaeger console. We can find the request by the guid:x-request-id tag.

Here’s the result of our search:

quarkus-istio-tracing-jaeger

We have a full trace of the request including communication between Istio gateway and insurance-app, and also between the insurance-app and person-app. In order to print the details of the trace just click the record. You will see a trace timeline and requests/responses structure. You can easily verify that latency occurs somewhere between insurance-app and person-app, because the request has been processed only 4.46 ms by the person-app.

quarkus-istio-tracing-jaeger-timeline

The post Distributed Tracing with Istio, Quarkus and Jaeger appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/01/31/distributed-tracing-with-istio-quarkus-and-jaeger/feed/ 0 10540
Kafka In Microservices With Micronaut https://piotrminkowski.com/2019/08/06/kafka-in-microservices-with-micronaut/ https://piotrminkowski.com/2019/08/06/kafka-in-microservices-with-micronaut/#respond Tue, 06 Aug 2019 07:14:19 +0000 https://piotrminkowski.wordpress.com/?p=7207 Today we are going to build an example of microservices that communicates with each other asynchronously through Apache Kafka topics. We use the Micronaut Framework, which provides a dedicated library for integration with Kafka. Let’s take a brief look at the architecture of our sample system. We have 4 microservices: order-service, trip-service, driver-service, and passenger-service. […]

The post Kafka In Microservices With Micronaut appeared first on Piotr's TechBlog.

]]>
Today we are going to build an example of microservices that communicates with each other asynchronously through Apache Kafka topics. We use the Micronaut Framework, which provides a dedicated library for integration with Kafka. Let’s take a brief look at the architecture of our sample system. We have 4 microservices: order-service, trip-service, driver-service, and passenger-service. The implementation of these applications is very simple. All of them have in-memory storage and connect to the same Kafka instance.

A primary goal of our system is to arrange a trip for customers. The order-service application also acts as a gateway. It is receiving requests from customers, saving history, and sending events to orders topic. All the other microservices are listening on this topic and processing orders sent by order-service. Each microservice has its own dedicated topic, where it sends events with information about changes. Such events are received by some other microservices. The architecture is presented in the picture below.

micronaut-kafka-1.png

Before reading this article it is worth familiarizing yourself with Micronaut Framework. You may read one of my previous articles describing a process of building microservices communicating via REST API: Quick Guide to Microservices with Micronaut Framework

1. Running Kafka

To run Apache Kafka on the local machine we may use its Docker image. It seems that the most up-to-date image is shared by https://hub.docker.com/u/wurstmeister. Before starting Kafka containers we have to start the ZooKeeper server, which is used by Kafka. If you run Docker on Windows the default address of its virtual machine is 192.168.99.100. It also has to be set as an environment for a Kafka container.
Both Zookeeper and Kafka containers will be started in the same network kafka. Zookeeper is available under the name zookeeper, and is exposed on port 2181. Kafka container requires that address under env variable KAFKA_ZOOKEEPER_CONNECT.

$ docker network create kafka
$ docker run -d --name zookeeper --network kafka -p 2181:2181 wurstmeister/zookeeper
$ docker run -d --name kafka -p 9092:9092 --network kafka --env KAFKA_ADVERTISED_HOST_NAME=192.168.99.100 --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 wurstmeister/kafka

2. Including Micronaut Kafka

Micronaut example applications built with Kafka can be started with or without the presence of an HTTP server. To enable Micronaut Kafka you need to include the micronaut-kafka library to your dependencies. In case you would like to expose HTTP API you should also include micronaut-http-server-netty:

<dependency>
   <groupId>io.micronaut.configuration</groupId>
   <artifactId>micronaut-kafka</artifactId>
</dependency>
<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-http-server-netty</artifactId>
</dependency>

3. Building microservice order-service

The application order-service as the only one starts embedded HTTP server and exposes REST API. That’s why we may enable built-in Micronaut health checks for Kafka. To do that we should first include micronaut-management dependency:

<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-management</artifactId>
</dependency>

For convenience, we will enable all management endpoints and disable HTTP authentication for them by defining the following configuration inside application.yml:

endpoints:
  all:
    enabled: true
    sensitive: false

Now, a health check is available under address http://localhost:8080/health. Our sample application will also expose a simple REST API for adding new orders and listing all previously created orders. Here’s the Micronaut controller implementation responsible for exposing those endpoints:

@Controller("orders")
public class OrderController {

    @Inject
    OrderInMemoryRepository repository;
    @Inject
    OrderClient client;

    @Post
    public Order add(@Body Order order) {
        order = repository.add(order);
        client.send(order);
        return order;
    }

    @Get
    public Set<Order> findAll() {
        return repository.findAll();
    }

}

Each microservice uses an in-memory repository implementation. Here’s repository implementation inside order-service:

@Singleton
public class OrderInMemoryRepository {

    private Set<Order> orders = new HashSet<>();

    public Order add(Order order) {
        order.setId((long) (orders.size() + 1));
        orders.add(order);
        return order;
    }

    public void update(Order order) {
        orders.remove(order);
        orders.add(order);
    }

    public Optional<Order> findByTripIdAndType(Long tripId, OrderType type) {
        return orders.stream().filter(order -> order.getTripId().equals(tripId) && order.getType() == type).findAny();
    }

    public Optional<Order> findNewestByUserIdAndType(Long userId, OrderType type) {
        return orders.stream().filter(order -> order.getUserId().equals(userId) && order.getType() == type)
                .max(Comparator.comparing(Order::getId));
    }

    public Set<Order> findAll() {
        return orders;
    }

}

In-memory repository stores Order object instances. Order object is also sent to Kafka topic named orders. Here’s an implementation of Order class:

public class Order {

    private Long id;
    private LocalDateTime createdAt;
    private OrderType type;
    private Long userId;
    private Long tripId;
    private float currentLocationX;
    private float currentLocationY;
    private OrderStatus status;
   
    // ... GETTERS AND SETTERS
}

4. Example of asynchronous communication with Kafka and Micronaut

Now, let’s consider one of the use cases possible to realize by our sample system – adding a new trip. In the first step (1) we are adding a new order of type OrderType.NEW_TRIP. After that order-service creates an order and send it to the orders topic. The order is received by three microservices: driver-service, passenger-service and order-service (2). A new order is processed by all these applications. The passenger-service application checks if there are sufficient funds on the passenger account. If not it cancels the trip, otherwise it does not do anything. The driver-service is looking for the nearest available driver, while trip-service creates and stores new trips. Both driver-service and trip-service sends events to their topics (drivers, trips) with information about changes (3) Every event can be accessed by other microservices, for example trip-service listen for event from driver-service in order to assign a new driver to the trip (4). The following picture illustrates the communication between our microservices when adding a new trip.

micronaut-kafka-3.png

Now, let’s proceed to the implementation details.

Step 1: Sending order

First we need to create a Kafka client responsible for sending messages to a topic. To achieve that we should create an interface annotated with @KafkaClient and declare one or more methods for sending messages. Every method should have a target topic name set through @Topic annotation. For method parameters we may use three annotations @KafkaKey, @Body or @Header. @KafkaKey is used for partitioning, which is required by our sample applications. In the client implementation visible below we just use @Body annotation.

@KafkaClient
public interface OrderClient {

    @Topic("orders")
    void send(@Body Order order);

}

Step 2: Receiving order

Once an order has been sent by the client it is received by all other microservices listening on the orders topic. Here’s a listener implementation in the driver-service. A listener class should be annotated with @KafkaListener. We may declare groupId as an annotation field to prevent from receiving the same message by more than one instance of a single application. Then we are declaring a method for processing incoming messages. The same as a client method it should be annotated with @Topic, to set the name of a target topic. Because we are listening for Order objects it should be annotated with @Body – the same as the corresponding client method.

@KafkaListener(groupId = "driver")
public class OrderListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

    private DriverService service;

    public OrderListener(DriverService service) {
        this.service = service;
    }

    @Topic("orders")
    public void receive(@Body Order order) {
        LOGGER.info("Received: {}", order);
        switch (order.getType()) {
            case NEW_TRIP -> service.processNewTripOrder(order);
        }
    }

}

Step 3: Sending to other Kafka topic

Now, let’s take a look on the processNewTripOrder method inside driver-service. DriverService injects two different Kafka client beans: OrderClient and DriverClient. When processing a new order it tries to find the available driver, which is the closest to the customer who sent the order. After finding him it changes the status to UNAVAILABLE and sends the message with Driver object to the drivers topic.

@Singleton
public class DriverService {

    private static final Logger LOGGER = LoggerFactory.getLogger(DriverService.class);

    private DriverClient client;
    private OrderClient orderClient;
    private DriverInMemoryRepository repository;

    public DriverService(DriverClient client, OrderClient orderClient, DriverInMemoryRepository repository) {
        this.client = client;
        this.orderClient = orderClient;
        this.repository = repository;
    }

    public void processNewTripOrder(Order order) {
        LOGGER.info("Processing: {}", order);
        Optional<Driver> driver = repository.findNearestDriver(order.getCurrentLocationX(), order.getCurrentLocationY());
        driver.ifPresent(driverLocal -> {
            driverLocal.setStatus(DriverStatus.UNAVAILABLE);
            repository.updateDriver(driverLocal);
            client.send(driverLocal, String.valueOf(order.getId()));
            LOGGER.info("Message sent: {}", driverLocal);
        });
    }
   
    // ...
}

Here’s an implementation of Kafka client inside driver-service used for sending messages to the drivers topic. Because we need to link the instance of Driver with order we annotate orderId parameter with @Header. There is no sense to include it to Driver class just to assign it to the right trip on the listener side.

@KafkaClient
public interface DriverClient {

    @Topic("drivers")
    void send(@Body Driver driver, @Header("Order-Id") String orderId);

}

Step 4: Inter-service communication example with Micronaut Kafka

The message sent by DriverClient is received by @Listener declared inside trip-service. It listens for messages incoming to the trips topic. The signature of receiving method is pretty similar to the client sending method as shown below:

@KafkaListener(groupId = "trip")
public class DriverListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderListener.class);

    private TripService service;

    public DriverListener(TripService service) {
        this.service = service;
    }

    @Topic("drivers")
    public void receive(@Body Driver driver, @Header("Order-Id") String orderId) {
        LOGGER.info("Received: driver->{}, header->{}", driver, orderId);
        service.processNewDriver(driver);
    }

}

A new driver with given id is being assigned to the trip searched by orderId. That’s a final step of our communication process when adding a new trip.

@Singleton
public class TripService {

    private static final Logger LOGGER = LoggerFactory.getLogger(TripService.class);

    private TripInMemoryRepository repository;
    private TripClient client;

    public TripService(TripInMemoryRepository repository, TripClient client) {
        this.repository = repository;
        this.client = client;
    }


    public void processNewDriver(Driver driver, String orderId) {
        LOGGER.info("Processing: {}", driver);
        Optional<Trip> trip = repository.findByOrderId(Long.valueOf(orderId));
        trip.ifPresent(tripLocal -> {
            tripLocal.setDriverId(driver.getId());
            repository.update(tripLocal);
        });
    }
   
   // ... OTHER METHODS

}

5. Tracing

We may easily enable distributed tracing with Micronaut Kafka. First, we need to enable and configure Micronaut Tracing. To do that you should first add some dependencies:

<dependency>
    <groupId>io.micronaut</groupId>
    <artifactId>micronaut-tracing</artifactId>
</dependency>
<dependency>
    <groupId>io.zipkin.brave</groupId>
    <artifactId>brave-instrumentation-http</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.zipkin.reporter2</groupId>
    <artifactId>zipkin-reporter</artifactId>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>io.opentracing.brave</groupId>
    <artifactId>brave-opentracing</artifactId>
</dependency>
<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-client</artifactId>
    <version>0.0.16</version>
    <scope>runtime</scope>
</dependency>

We also need to configure some application settings inside application.yml including an address of our tracing tool. In that case, it is Zipkin.

tracing:
  zipkin:
    enabled: true
    http:
      url: http://192.168.99.100:9411
    sampler:
      probability: 1

Before starting our application we have to run Zipkin container:

$ docker run -d --name zipkin -p 9411:9411 openzipkin/zipkin

Conclusion

In this article you were guided through the process of building microservice architecture using asynchronous communication via Apache Kafka. I have shown you ea example with the most important features of the Micronaut Kafka library that allows you to easily declare producer and consumer of Kafka topics, enable health checks, and distributed tracing for your microservices. I have described an implementation of a single scenario for our system, that covers adding a new trip at the customer’s request. In order to see the full implementation of the sample system described in this article please check out the source code available on GitHub: https://github.com/piomin/sample-kafka-micronaut-microservices.git.

The post Kafka In Microservices With Micronaut appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2019/08/06/kafka-in-microservices-with-micronaut/feed/ 0 7207