Reactor Archives - Piotr's TechBlog https://piotrminkowski.com/tag/reactor/ Java, Spring, Kotlin, microservices, Kubernetes, containers Sat, 19 Dec 2020 14:40:35 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 Reactor Archives - Piotr's TechBlog https://piotrminkowski.com/tag/reactor/ 32 32 181738725 A Deep Dive Into Spring WebFlux Threading Model https://piotrminkowski.com/2020/03/30/a-deep-dive-into-spring-webflux-threading-model/ https://piotrminkowski.com/2020/03/30/a-deep-dive-into-spring-webflux-threading-model/#comments Mon, 30 Mar 2020 11:35:58 +0000 http://piotrminkowski.com/?p=7878 If you are building reactive applications with Spring WebFlux, typically you will use Reactor Netty as a default embedded server. Reactor Netty is currently one of the most popular asynchronous event-driven frameworks. It provides non-blocking and backpressure-ready TCP, HTTP, and UDP clients and servers. In fact, the most important difference between synchronous and reactive frameworks […]

The post A Deep Dive Into Spring WebFlux Threading Model appeared first on Piotr's TechBlog.

]]>
If you are building reactive applications with Spring WebFlux, typically you will use Reactor Netty as a default embedded server. Reactor Netty is currently one of the most popular asynchronous event-driven frameworks. It provides non-blocking and backpressure-ready TCP, HTTP, and UDP clients and servers. In fact, the most important difference between synchronous and reactive frameworks is in their threading and concurrency model. Without understanding how reactive framework handles threads, you won’t fully understand reactivity. Let’s take a closer look on the threading model realized by Spring WebFlux and Project Reactor.
We should begin from the following fragment in Spring Framework Reference.

On a “vanilla” Spring WebFlux server (for example, no data access nor other optional dependencies), you can expect one thread for the server and several others for request processing (typically as many as the number of CPU cores).

So, the number of processing threads that handle incoming requests is related to the number of CPU cores. If you have 4 cores and you didn’t enable hyper-threading mechanism you would have 4 worker threads in the pool. You can determine the number of cores available to the JVM by using the static method, availableProcessors from class Runtime: Runtime.getRuntime().availableProcessors().

Example

At that moment, it is worth referring to the example application. It is a simple Spring Boot application that exposes reactive API built using Spring Webflux. The source code is available on GitHub: https://github.com/piomin/sample-spring-webflux.git. You can run it after building using java -jar command, or just simply run it from IDE. It also contains Dockerfile in the root directory, so you can build and run it inside a Docker container.

Limit CPU

With Docker you can easily limit the number of CPU cores “visible” for your application. When running a Docker container you just need to set parameter cpus as shown below.

$ docker run -d --name webflux --cpus="1" -p 8080:8080 piomin/sample-spring-webflux

The sample application is printing the number of available CPU cores during startup. Here’s the fragment of code responsible for it.

@SpringBootApplication
public class SampleSpringWebFluxApp {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleSpringWebFluxApp.class);

    public static void main(String[] args) {
        SpringApplication.run(SampleSpringWebFluxApp.class, args);
    }

    @PostConstruct
    public void init() {
        LOGGER.info("CPU: {}", Runtime.getRuntime().availableProcessors());
    }

}

Now, we can send some test requests to one of the available endpoints.

$ curl http://192.168.99.100:8080/persons/json

After that we may take a look at the logs generated by application using docker logs -f webflux command. The result is pretty unexpectable. We still have four worker threads in the pool, although the application uses only a single CPU core. Here’s the screen with application logs.

spring-webflux-threading-model-docker

An explanation is pretty simple. Here’s the fragment of Reactor Netty source code that illustrates the algorithm used. The minimum number of worker threads in the pool is 4.

spring-webflux-threading-model-reactor-netty

Well, it exactly explains why my application uses the same number of threads when it is running on my laptop with 4 CPUs, and on the Docker container which limits used CPU to a single core. You should also remember that Java is able to see the number of CPU cores inside a Docker container since version 10. If you use Java 8 on Docker your application sees the CPU on the machine not inside the container, which may have a huge impact on the consumed resources.

WebClient Thread Pool

Let’s consider the following implementation of reactive endpoint. First, we are emitting a stream of three Person objects, and then we are merging it with the stream returned by the WebClient.

@Autowired
WebClient client;

private Stream<Person> prepareStreamPart1() {
   return Stream.of(
      new Person(1, "Name01", "Surname01", 11),
      new Person(2, "Name02", "Surname02", 22),
      new Person(3, "Name03", "Surname03", 33)
   );
}

@GetMapping("/integration/{param}")
public Flux<Person> findPersonsIntegration(@PathVariable("param") String param) {
   return Flux.fromStream(this::prepareStreamPart1).log()
      .mergeWith(
         client.get().uri("/slow/" + param)
            .retrieve()
            .bodyToFlux(Person.class)
            .log()
      );
}

In order to properly understand what exactly happens with thread pool let’s recall the following fragment from Spring Framework Reference.

The reactive WebClient operates in event loop style. So you can see a small, fixed number of processing threads related to that (for example, reactor-http-nio- with the Reactor Netty connector). However, if Reactor Netty is used for both client and server, the two share event loop resources by default.

Typically, you have 4 worker threads, which are shared between both server processing and client-side communication with other resources. Since, we have enabled detailed logging with log method, we may easily analyse step by step thread pooling for the single request sent to the test endpoint. The incoming request is being processed by reactor-http-nio-2 thread. Then WebClient is called and it subscribes to the result stream in the same thread. But the result is published on the different thread reactor-http-nio-3, a first available in the pool.

spring-webflux-threading-model-webclient-logs

We can also analyze the subsequent logs for a single selected thread. In the following fragment of logs for thread reactor-http-nio-1 you may see that WebClient does not block the thread while waiting for a result from a calling resource. The method onComplete is called here for times in row without any onSubscribe call. The idea of non-blocking threads is the main concept around reactive programming, which allows us to handle many requests using relatively small numbers of threads (4 in that case).


15:02:25.875 --- [ctor-http-nio-1] : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
15:02:25.875 --- [ctor-http-nio-1] : request(32)
15:02:25.891 --- [ctor-http-nio-1] : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
15:02:25.891 --- [ctor-http-nio-1] : | request(32)
15:02:25.891 --- [ctor-http-nio-1] : | onNext(Person(id=1, firstName=Name01, lastName=Surname01, age=11))
15:02:25.892 --- [ctor-http-nio-1] : | onNext(Person(id=2, firstName=Name02, lastName=Surname02, age=22))
15:02:25.892 --- [ctor-http-nio-1] : | onNext(Person(id=3, firstName=Name03, lastName=Surname03, age=33))
15:02:25.892 --- [ctor-http-nio-1] : | onComplete()
15:02:25.894 --- [ctor-http-nio-1] : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
15:02:25.895 --- [ctor-http-nio-1] : request(32)
15:02:25.924 --- [ctor-http-nio-1] : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
15:02:25.925 --- [ctor-http-nio-1] : | request(32)
15:02:25.925 --- [ctor-http-nio-1] : | onNext(Person(id=1, firstName=Name01, lastName=Surname01, age=11))
15:02:25.925 --- [ctor-http-nio-1] : | onNext(Person(id=2, firstName=Name02, lastName=Surname02, age=22))
15:02:25.925 --- [ctor-http-nio-1] : | onNext(Person(id=3, firstName=Name03, lastName=Surname03, age=33))
15:02:25.926 --- [ctor-http-nio-1] : | onComplete()
15:02:25.927 --- [ctor-http-nio-1] : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
15:02:25.927 --- [ctor-http-nio-1] : request(32)
15:02:26.239 --- [ctor-http-nio-1] : onNext(Person(id=38483, firstName=Name6, lastName=Surname6, age=50))
15:02:26.241 --- [ctor-http-nio-1] : onNext(Person(id=59103, firstName=Name6, lastName=Surname6, age=6))
15:02:26.241 --- [ctor-http-nio-1] : onNext(Person(id=67587, firstName=Name6, lastName=Surname6, age=36))
15:02:26.242 --- [ctor-http-nio-1] : onComplete()
15:02:26.283 --- [ctor-http-nio-1] : onNext(Person(id=40006, firstName=Name2, lastName=Surname2, age=41))
15:02:26.284 --- [ctor-http-nio-1] : onNext(Person(id=94727, firstName=Name2, lastName=Surname2, age=79))
15:02:26.287 --- [ctor-http-nio-1] : onNext(Person(id=19891, firstName=Name2, lastName=Surname2, age=84))
15:02:26.287 --- [ctor-http-nio-1] : onComplete()
15:02:26.292 --- [ctor-http-nio-1] : onNext(Person(id=67550, firstName=Name14, lastName=Surname14, age=92))
15:02:26.293 --- [ctor-http-nio-1] : onNext(Person(id=85063, firstName=Name14, lastName=Surname14, age=76))
15:02:26.293 --- [ctor-http-nio-1] : onNext(Person(id=54572, firstName=Name14, lastName=Surname14, age=18))
15:02:26.293 --- [ctor-http-nio-1] : onComplete()
15:02:26.295 --- [ctor-http-nio-1] : onNext(Person(id=49896, firstName=Name16, lastName=Surname16, age=20))
15:02:26.295 --- [ctor-http-nio-1] : onNext(Person(id=85684, firstName=Name16, lastName=Surname16, age=39))
15:02:26.296 --- [ctor-http-nio-1] : onNext(Person(id=2605, firstName=Name16, lastName=Surname16, age=75))
15:02:26.296 --- [ctor-http-nio-1] : onComplete()
15:02:26.337 --- [ctor-http-nio-1] : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
15:02:26.338 --- [ctor-http-nio-1] : | request(32)
15:02:26.339 --- [ctor-http-nio-1] : | onNext(Person(id=1, firstName=Name01, lastName=Surname01, age=11))
15:02:26.339 --- [ctor-http-nio-1] : | onNext(Person(id=2, firstName=Name02, lastName=Surname02, age=22))
15:02:26.339 --- [ctor-http-nio-1] : | onNext(Person(id=3, firstName=Name03, lastName=Surname03, age=33))
15:02:26.340 --- [ctor-http-nio-1] : | onComplete()

Long response time

If you are using WebClient for communication with other resources you should be able to handle a long response time. Of course WebClient does not block the thread, but sometimes it is desired to use another thread pool than the main worker thread pool shared with the server. To do that you should use publishOn method. Spring WebFlux provides thread pool abstractions called Schedulers. You may use it to create different concurrency strategies. If you prefer to have a full control of the minimum and maximum number of threads in the pool you should define your own task executor as shown below. The minimum size of the pool is 5, while the maximum size is 10.

@Bean
public ThreadPoolTaskExecutor taskExecutor() {
   ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
   executor.setCorePoolSize(5);
   executor.setMaxPoolSize(10);
   executor.setQueueCapacity(100);
   executor.setThreadNamePrefix("slow-");
   executor.initialize();
   return executor;
}

Then you just need to set it as a default scheduler for publishOn method. Here’s our second test endpoint that sets a dedicated thread pool for WebClient.

@GetMapping("/integration-in-different-pool/{param}")
public Flux<Person> findPersonsIntegrationInDifferentPool(@PathVariable("param") String param) {
   return Flux.fromStream(this::prepareStreamPart1).log()
      .mergeWith(
         client.get().uri("/slow/" + param)
            .retrieve()
            .bodyToFlux(Person.class)
            .log()
            .publishOn(Schedulers.fromExecutor(taskExecutor))
         );
}

Performance Testing

Now, our main goal is to compare performance of the both implemented endpoints. First of them using the same thread pool for server requests processing and for WebClient, while the second creates separate thread pool for WebClient with publishOn. The first step of preparing such a comparison test is to create a mock with delayed response. We will use MockWebServer provided by okhttp. It caches requests send to /slow/{param} endpoint, emits the stream with Person objects delayed 200 ms.

public static MockWebServer mockBackEnd;

@BeforeClass
public static void setUp() throws IOException {
   final Dispatcher dispatcher = new Dispatcher() {

      @NotNull
      @Override
      public MockResponse dispatch(@NotNull RecordedRequest recordedRequest) throws InterruptedException {
         String pathParam = recordedRequest.getPath().replaceAll("/slow/", "");
         List personsPart2 = List.of(new Person(r.nextInt(100000), "Name" + pathParam, "Surname" + pathParam, r.nextInt(100)),
            new Person(r.nextInt(100000), "Name" + pathParam, "Surname" + pathParam, r.nextInt(100)),
            new Person(r.nextInt(100000), "Name" + pathParam, "Surname" + pathParam, r.nextInt(100)));
         try {
            return new MockResponse()
               .setResponseCode(200)
               .setBody(mapper.writeValueAsString(personsPart2))
               .setHeader("Content-Type", "application/json")
               .setBodyDelay(200, TimeUnit.MILLISECONDS);
         }
         catch (JsonProcessingException e) {
            e.printStackTrace();
         }
         return null;
      }
   };
   mockBackEnd = new MockWebServer();
   mockBackEnd.setDispatcher(dispatcher);
   mockBackEnd.start();
   System.setProperty("target.uri", "http://localhost:" + mockBackEnd.getPort());
}

@AfterClass
public static void tearDown() throws IOException {
   mockBackEnd.shutdown();
}

The last step in our implementation is to create test methods. For calling both sample endpoints we can use synchronous TestRestTemplate, since it doesn’t impact on the results. For benchmarking I’m using junit-benchmarks library, which allows me to define the number of concurrent threads and maximum number of attempts. To enable that library for JUnit test we just need to define @Rule. Here’s the implementation of our performance test. We are running Spring Boot in test mode and then starting 50 concurrent threads with 300 attempts.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
@RunWith(SpringRunner.class)
public class PerformanceSpringWebFluxTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(PerformanceSpringWebFluxTest.class);
    private static ObjectMapper mapper = new ObjectMapper();
    private static Random r = new Random();
    private static int i = 0;

    @Rule
    public TestRule benchmarkRun = new BenchmarkRule();
    @Autowired
    TestRestTemplate template;
   
    @Test
    @BenchmarkOptions(warmupRounds = 10, concurrency = 50, benchmarkRounds = 300)
    public void testPerformance() throws InterruptedException {
        ResponseEntity<Person[]> r = template.exchange("/persons/integration/{param}", HttpMethod.GET, null, Person[].class, ++i);
        Assert.assertEquals(200, r.getStatusCodeValue());
        Assert.assertNotNull(r.getBody());
        Assert.assertEquals(6, r.getBody().length);
    }

    @Test
    @BenchmarkOptions(warmupRounds = 10, concurrency = 50, benchmarkRounds = 300)
    public void testPerformanceInDifferentPool() throws InterruptedException {
        ResponseEntity<Person[]> r = template.exchange("/persons/integration-in-different-pool/{param}", HttpMethod.GET, null, Person[].class, ++i);
        Assert.assertEquals(200, r.getStatusCodeValue());
        Assert.assertNotNull(r.getBody());
        Assert.assertEquals(6, r.getBody().length);
    }
}

Here are the results. For an endpoint with a default Spring WebFlux threading model assuming a shareable thread pool between server processing and client requests there is ~5.5s for processing all 300 requests. For a method with a separate thread pool for WebClient we have ~2.9s for all 300 requests. In general, the more threads you will use in your thread the ratio of average processing time between first and second model would be changed in favor of the second model (with separate thread pool for client).

spring-webflux-threading-model-testresult1

And the result for separated WebClient thread pool.

spring-webflux-threading-model-testresult2

Using Spring Boot Actuator

Sometimes, you need to use some additional libraries in your WebFlux application. One of them is Spring Boot Actuator, which may be especially required if you have to expose health check or metrics endpoints. To find out how Spring WebFlux is handling threading for Spring Boot Actuator we will profile our application with YourKit. After running the application you should generate some traffic to /actuator/health endpoint. Here’s the screen with threads from YourKit. As you see a worker thread (reactor-http-nio-*) delegates long-running job threads available in bounded elastic pool (boundedElastic-*).

spring-webflux-threading-model-yourkit

Threads from bounded elastic are checking free disk space, that is by default a part of Spring Boot Actuator /health endpoint.

spring-webflux-threading-model-diskspace

The post A Deep Dive Into Spring WebFlux Threading Model appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2020/03/30/a-deep-dive-into-spring-webflux-threading-model/feed/ 19 7878
Using Reactive WebClient with Spring WebFlux https://piotrminkowski.com/2019/11/04/using-reactive-webclient-with-spring-webflux/ https://piotrminkowski.com/2019/11/04/using-reactive-webclient-with-spring-webflux/#comments Mon, 04 Nov 2019 10:12:52 +0000 https://piotrminkowski.wordpress.com/?p=7407 Reactive APIs and generally reactive programming become increasingly popular lately. You have a chance to observe more and more new frameworks and toolkits supporting reactive programming, or just dedicated for this. Today, in the era of microservices architecture, network communication through APIs becomes critical for applications. Therefore, reactive APIs seem to be an attractive alternative […]

The post Using Reactive WebClient with Spring WebFlux appeared first on Piotr's TechBlog.

]]>
Reactive APIs and generally reactive programming become increasingly popular lately. You have a chance to observe more and more new frameworks and toolkits supporting reactive programming, or just dedicated for this. Today, in the era of microservices architecture, network communication through APIs becomes critical for applications. Therefore, reactive APIs seem to be an attractive alternative to a traditional, synchronous approach. It should be definitely considered as a primary approach if you are working with large streams of data exchanged via network communication.

Spring supports reactive programming and reactive APIs too. You could have a chance to read about it in some of my previous articles. I focused on introducing that support. In the article Reactive Microservices with Spring WebFlux and Spring Cloud you can read more about building microservices architecture using Spring WebFlux and Spring Cloud. In turn, in the articles Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux and Reactive Elasticsearch with Spring Boot I have introduced reactive Spring Data repositories on an example of PostgreSQL and Elasticsearch. Those articles should be treated as an introduction to reactive programming with Spring. Today, I would like to go deeply into that topic and discuss some aspects related to the network communication between service that exposes reactive stream via API and service that consumes this API using Spring WebClient.

1. Access reactive stream using Spring WebClient

First, let’s consider the typical scenario of reading reactive API on the consumer side. We have the following implementation of reactive stream containing Person objects on the producer side:

@RestController
@RequestMapping("/persons")
public class PersonController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);

    @GetMapping("/json")
    public Flux<Person> findPersonsJson() {
        return Flux.fromStream(this::prepareStream)
                .doOnNext(person -> LOGGER.info("Server produces: {}", person));
    }
}

We can easily access it with non-blocking Spring WebFlux WebClient. The following test starts with a sample Spring WebFlux application, defines WebClient instance, and subscribes to the response stream.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class SampleSpringWebFluxTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleSpringWebFluxTest.class);
    final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();

    @Test
    public void testFindPersonsJson() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Flux<Person> persons = client.get().uri("/persons/json").retrieve().bodyToFlux(Person.class);
        persons.subscribe(person -> {
            waiter.assertNotNull(person);
            LOGGER.info("Client subscribes: {}", person);
            waiter.resume();
        });
        waiter.await(3000, 9);
    }
}

In reactive programming with Reactor and Spring WebFlux you first need to subscribe to the stream in order to be able to access emitted objects. Assuming that our test stream has 9 Person elements you will receive the following log output:

spring-webflux-webclient-1

Let’s think about what happened here. Our reactive stream on the server side has been returned as a JSON array response to the consumer. It means that our reactive client is able to start reading the stream only after completion of the elements emission on the server side. That’s not exactly what we wanted to achieve, because we would like to take advantage of reactive streams also on the client side, and be able to read every element on stream just after it has been emitted by the producer. But with application/json content type it is just not possible, what is perfectly seen on the fragment of response below.

spring-webflux-webclient-3

2. Expose event stream using Spring WebFlux

The problem defined in the previous section lies obviously on the server side. Spring WebFlux WebClient is able to read reactive streams continuously, but the producer needs to “properly” emit it. To achieve it we should set content type to application/json+stream or text/event-stream.

@RestController
@RequestMapping("/persons")
public class PersonController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);

    @GetMapping(value = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Person> findPersonsStream() {
        return Flux.fromStream(this::prepareStream).delaySequence(Duration.ofMillis(100))
                .doOnNext(person -> LOGGER.info("Server produces: {}", person));
    }
}

Now, our response looks slightly different than before as shown below.

spring-webflux-webclient-4

To see the effect during the test I delayed the whole stream a little (around 100 milliseconds), because it takes a time before subscribes start to receive elements from the stream after subscribing to it. The current test is the same as the previous test, we are subscribing to the response stream and printing all the elements.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class SampleSpringWebFluxTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleSpringWebFluxTest.class);
    final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();

    @Test
    public void testFindPersonsStream() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Flux<Person> persons = client.get().uri("/persons/stream").retrieve().bodyToFlux(Person.class);
        persons.subscribe(person -> {
            LOGGER.info("Client subscribes: {}", person);
            waiter.assertNotNull(person);
            waiter.resume();
        });
        waiter.await(3000, 9);
    }
}

Here are the results. As you see the elements are processed on the client-side just after being emitted by the producer. What is worth to note – all the elements are sending within the same thread.

webclient-2

3. Implementing backpressure

Backpressure is one of the most important reasons you would decide to use reactive programming. Following Spring WebFlux documentation it supports backpressure, since Project Reactor is a Reactive Streams library and, therefore, all of its operators support non-blocking back pressure. The whole sentence is of course conforming to the truth, but only on the server-side. Maybe the next fragment of documentation shall shed some light on things: Reactor has a strong focus on server-side Java.. We should remember that Spring WebClient and WebFlux uses TCP transport for communication between a client and the server. And therefore, a client is not able to regulate the frequency of elements emission on the server-side. I don’t want to go into the details right now, for the explanation of that situation you may refer to the following post https://stackoverflow.com/questions/52244808/backpressure-mechanism-in-spring-web-flux.
Ok, before proceeding let’s recap the definition of backpressure term. Backpressure (or backpressure) is resistance or force opposing the desired flow of data through software. In simple words, if a producer sends more events than a consumer is able to handle in a specific period of time, the consumer should be able to regulate the frequency of sending events on the producer side. Let’s consider the following test example.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class SampleSpringWebFluxTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleSpringWebFluxTest.class);
    final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();

    @Test
    public void testFindPersonsStreamBackPressure() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Flux<Person> persons = client.get().uri("/persons/stream/back-pressure").retrieve().bodyToFlux(Person.class);
        persons.map(this::doSomeSlowWork).subscribe(person -> {
            waiter.assertNotNull(person);
            LOGGER.info("Client subscribes: {}", person);
            waiter.resume();
        });
        waiter.await(3000, 9);
    }

    private Person doSomeSlowWork(Person person) {
        try {
            Thread.sleep(90);
        }
        catch (InterruptedException e) { }
        return person;
    }
}

After receiving a stream of elements our test calls a time-expensive mapping method on each element. So, it is not able to handle so many elements as has been sent by the producer. In this case, the only way to somehow “regulate” backpressure is through the delayElements method on the server-side. I also tried to use the limitRate method on the service side and implement my own custom Subscriber on the client-side, but I wasn’t successful. Here’s the current implementation of our API method for returning streams of Person objects.

@RestController
@RequestMapping("/persons")
public class PersonController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);

    @GetMapping(value = "/stream/back-pressure", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Person> findPersonsStreamBackPressure() {
        return Flux.fromStream(this::prepareStream).delayElements(Duration.ofMillis(100))
                .doOnNext(person -> LOGGER.info("Server produces: {}", person));
    }
}

After running the test we can see that every element is 100 milliseconds delayed, and also the producer uses the whole pool of threads for emitting a stream of objects.

webclient-3

Source Code

The source code of sample application and JUnit tests is as always available on GitHub. The repository address is https://github.com/piomin/sample-spring-webflux.git.

The post Using Reactive WebClient with Spring WebFlux appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2019/11/04/using-reactive-webclient-with-spring-webflux/feed/ 2 7407
Reactive Elasticsearch With Spring Boot https://piotrminkowski.com/2019/10/25/reactive-elasticsearch-with-spring-boot/ https://piotrminkowski.com/2019/10/25/reactive-elasticsearch-with-spring-boot/#respond Fri, 25 Oct 2019 08:35:40 +0000 https://piotrminkowski.wordpress.com/?p=7372 One of the more notable features introduced in the latest release of Spring Data is reactive support for Elasticsearch. Since Spring Data Moore we can take advantage of reactive templates and repositories. It is built on top of a fully reactive Elasticsearch REST client, that is based on Spring WebClient. It is also worth to […]

The post Reactive Elasticsearch With Spring Boot appeared first on Piotr's TechBlog.

]]>
One of the more notable features introduced in the latest release of Spring Data is reactive support for Elasticsearch. Since Spring Data Moore we can take advantage of reactive templates and repositories. It is built on top of a fully reactive Elasticsearch REST client, that is based on Spring WebClient. It is also worth to mention about support for reactive Querydsl, which can be included to your application through ReactiveQueryPredicateExecutor.
I have already shown you how to use Spring Data Repositories for synchronous integration with Elasticsearch API in one of my previous articles Elasticsearch with Spring Boot. There is no big difference between using standard and reactive Spring Data Repositories. I’ll focus on showing you those differences in a sample application used also in the previous article. Therefore it is worth reading my previous article before reading this. Let’s proceed to build the Spring Boot reactive Elasticsearch example.

1. Dependencies

I’m using the latest stable version of Spring Boot with JDK 11.

<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>2.2.0.RELEASE</version>
   <relativePath/>
</parent>
<properties>
   <java.version>11</java.version>
</properties>

We need to include Spring WebFlux and Spring Data Elasticsearch starters. We will also use Actuator for exposing health checks, and some libraries for automated testing like Spring Test and Testcontainers.

<dependencies>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
   </dependency>
   <dependency>
      <groupId>org.testcontainers</groupId>
      <artifactId>elasticsearch</artifactId>
      <version>1.12.2</version>
      <scope>test</scope>
   </dependency>
</dependencies>

2. Enabling Reactive Repositories

Before starting working with reactive Spring Data repositories we should enable it by annotating the main or configuration class with @EnableReactiveElasticsearchRepositories.

@SpringBootApplication
@EnableReactiveElasticsearchRepositories
public class SampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SampleApplication.class, args);
    }

    @Bean
    @ConditionalOnProperty("initial-import.enabled")
    public SampleDataSet dataSet() {
        return new SampleDataSet();
    }

}

3. Building reactive Elasticsearch repositories

Spring Data Elasticsearch comes with three interfaces that supports reactive operations: ReactiveRepository, ReactiveCrudRepository that adds save/update operations, and ReactiveSortingRepository offering some methods with sorting. The usage is the same as earlier – we just need to create our own repository that extends one of the interfaces listed above. We can also add some custom find methods following the Spring Data query naming convention. Similarly to all other Spring reactive projects Spring Data Elasticsearch Repositories support is built on top of Project Reactor.

@Repository
public interface EmployeeRepository extends ReactiveCrudRepository<Employee, Long> {

    Flux<Employee> findByOrganizationName(String name);
    Flux<Employee> findByName(String name);

}

Here’s our model class:

@Document(indexName = "sample", type = "employee")
public class Employee {

    @Id
    private Long id;
    @Field(type = FieldType.Object)
    private Organization organization;
    @Field(type = FieldType.Object)
    private Department department;
    private String name;
    private int age;
    private String position;
   
}

3. Building Controller

We will expose some reactive CRUD methods outside application using Spring WebFlux. Here’s our controller class implementation:

@RestController
@RequestMapping("/employees")
public class EmployeeController {

   @Autowired
   EmployeeRepository repository;

   @PostMapping
   public Mono<Employee> add(@RequestBody Employee employee) {
      return repository.save(employee);
   }

   @GetMapping("/{name}")
   public Flux<Employee> findByName(@PathVariable("name") String name) {
      return repository.findByName(name);
   }

   @GetMapping
   public Flux<Employee> findAll() {
      return repository.findAll();
   }

   @GetMapping("/organization/{organizationName}")
   public Flux<Employee> findByOrganizationName(@PathVariable("organizationName") String organizationName) {
      return repository.findByOrganizationName(organizationName);
   }

}

4. Running Spring Boot application

For the test purpose we need a single node Elasticsearch instance running in development mode. As usual, we will use a Docker container. Here’s the command that starts a Docker container and exposes it on port 9200.

$ docker run -d --name elasticsearch -p 9200:9200 -e "discovery.type=single-node" elasticsearch:6.6.2

My Docker Machine is available on virtual address 192.168.99.100, so I had to override Elasticsearch address in Spring Boot configuration file. Because Elasticsearch reactive repositories use ReactiveElasticsearchClient we have to set property spring.data.elasticsearch.client.reactive.endpoints to 192.168.99.100:9200. Actuator still uses a synchronous REST client for detecting Elasticsearch status in healthcheck, so we also need to override default address in spring.elasticsearch.rest.uris property.

spring:
  application:
    name: sample-spring-elasticsearch
  data:
    elasticsearch:
      client:
        reactive:
          endpoints: 192.168.99.100:9200
  elasticsearch:
    rest:
      uris: http://192.168.99.100:9200

5. Testing Spring Boot reactive Elasticserach support

The same as with synchronous repositories we use Testcontainers for JUnit tests. The only difference is that we need to block a repository method when verifying the result of the test.

@RunWith(SpringRunner.class)
@SpringBootTest
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class EmployeeRepositoryTest {

    @ClassRule
    public static ElasticsearchContainer container = new ElasticsearchContainer();
    @Autowired
    EmployeeRepository repository;

    @BeforeClass
    public static void before() {
        System.setProperty("spring.data.elasticsearch.client.reactive.endpoints", container.getContainerIpAddress() + ":" + container.getMappedPort(9200));
    }

    @Test
    public void testAdd() {
        Employee employee = new Employee();
        employee.setId(1L);
        employee.setName("John Smith");
        employee.setAge(33);
        employee.setPosition("Developer");
        employee.setDepartment(new Department(1L, "TestD"));
        employee.setOrganization(new Organization(1L, "TestO", "Test Street No. 1"));
        Mono<Employee> employeeSaved = repository.save(employee);
        Assert.assertNotNull(employeeSaved.block());
    }

    @Test
    public void testFindAll() {
        Flux<Employee> employees = repository.findAll();
        Assert.assertTrue(employees.count().block() > 0);
    }

    @Test
    public void testFindByOrganization() {
        Flux<Employee> employees = repository.findByOrganizationName("TestO");
        Assert.assertTrue(employees.count().block() > 0);
    }

    @Test
    public void testFindByName() {
        Flux<Employee> employees = repository.findByName("John Smith");
        Assert.assertTrue(employees.count().block() > 0);
    }

}

Source Code

For the current sample I’m using the same repository as for the sample with synchronous repositories. I created a new branch reactive for that. Here’s GitHub repository address https://github.com/piomin/sample-spring-elasticsearch/tree/reactive. It illustrates how to build the Spring Boot reactive Elasticsearch application.

The post Reactive Elasticsearch With Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2019/10/25/reactive-elasticsearch-with-spring-boot/feed/ 0 7372
Reactive Logging With Spring WebFlux and Logstash https://piotrminkowski.com/2019/10/15/reactive-logging-with-spring-webflux-and-logstash/ https://piotrminkowski.com/2019/10/15/reactive-logging-with-spring-webflux-and-logstash/#comments Tue, 15 Oct 2019 11:12:57 +0000 https://piotrminkowski.wordpress.com/?p=7346 I have already introduced my Spring Boot library for synchronous HTTP request/response logging in one of my previous articles Logging with Spring Boot and Elastic Stack. This library is dedicated to synchronous REST applications built with Spring MVC and Spring Web. Since version 5.0 Spring Framework also offers support for reactive REST API through the […]

The post Reactive Logging With Spring WebFlux and Logstash appeared first on Piotr's TechBlog.

]]>
I have already introduced my Spring Boot library for synchronous HTTP request/response logging in one of my previous articles Logging with Spring Boot and Elastic Stack. This library is dedicated to synchronous REST applications built with Spring MVC and Spring Web. Since version 5.0 Spring Framework also offers support for reactive REST API through the Spring WebFlux project. I decided to extend support for logging in my library to reactive Spring WebFlux.
The repository with source code is available on GitHub: https://github.com/piomin/spring-boot-logging.git. It consists with two Maven modules: logstash-logging-spring-boot-starter for synchronous logging and reactive-logstash-logging-spring-boot-starter for reactive Spring WebFlux applications. The library is available on Maven Central:

<dependency>
  <groupId>com.github.piomin</groupId>
  <artifactId>reactive-logstash-logging-spring-boot-starter</artifactId>
  <version>1.0.0.RELEASE</version>
</dependency>

Motivations

Although we are working with reactive APIs and streams, a requirement for logging every incoming request and outgoing response is still actual. Today, we are usually running many, small applications communicating with each other, so we are focusing on storing the logs in a single, central place. Here comes Logstash and Elastic Stack. Spring Boot and Spring WebFlux allow you to build reactive microservices fast. My library takes care of gathering HTTP request/response logs, sending them to ELK with proper tags and correlation. Using it in your application does not require any additional source code. You just need to include the library.
However, some things need to be discussed when talking about reactive logging. Because we are logging full requests with payloads we need to buffer them. It somehow goes against the reactive programming, since we’re trying there to be efficient with the available resources. Also, integration with Logstash is realized synchronously. It is worth keeping those two things in mind when using reactive-logstash-logging-spring-boot-starter in your application.

Implementation Details

Spring WebFlux Dependencies

Since the library is used for Spring Boot reactive APIs logging it needs to have Spring WebFlux in the dependencies. In turn, Spring WebFlux is built on top of Project Reactor, so reactor-core artifact also has to be on the dependencies list. We also need some standard Spring libraries, used for example to provide auto-configuration.

<dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-context</artifactId>
   <version>${spring.version}</version>
   <scope>provided</scope>
</dependency>
<dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-webflux</artifactId>
   <version>${spring.version}</version>
   <scope>provided</scope>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-autoconfigure</artifactId>
   <version>${spring.boot.version}</version>
   <scope>provided</scope>
</dependency>
<dependency>
   <groupId>io.projectreactor</groupId>
   <artifactId>reactor-core</artifactId>
   <version>3.3.0.RELEASE</version>
   <scope>provided</scope>
</dependency>

Spring WebFlux Reactive Interceptor

With Spring WebFlux we don’t have popular Spring MVC components for caching request/response bodies: ContentCachingRequestWrapper and ContentCachingResponseWrapper. However, an approach will be pretty similar to the approach applied when building a library for synchronous logging. We need to access the request and response body by wrapping it and buffering without consuming the stream. To do that we first need to create classes extending ServerHttpRequestDecorator and ServerHttpResponseDecorator. They give us access to the message body while Spring WebFlux is reading the stream and writing to the stream.
When extending ServerHttpRequestDecorator we need to override getBody. Keep in mind that we cannot block a reactive stream, so one of doOn is suitable for accessing it. The body is published as Flux containing DataBuffer objects. Inside the asynchronous doOnNext method we write the buffer to the temporary byte array.

public class RequestLoggingInterceptor extends ServerHttpRequestDecorator {

   private static final Logger LOGGER = LoggerFactory.getLogger(RequestLoggingInterceptor.class);

   public RequestLoggingInterceptor(ServerHttpRequest delegate) {
      super(delegate);
   }

   @Override
   public Flux<DataBuffer> getBody() {
      ByteArrayOutputStream baos = new ByteArrayOutputStream();
      return super.getBody().doOnNext(dataBuffer -> {
         try {
            Channels.newChannel(baos).write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
            String body = IOUtils.toString(baos.toByteArray(), "UTF-8");
            LOGGER.info("Request: method={}, uri={}, payload={}, audit={}", getDelegate().getMethod(),
                  getDelegate().getPath(), body, value("audit", true));
         } catch (IOException e) {
            e.printStackTrace();
         } finally {
            try {
               baos.close();
            } catch (IOException e) {
               e.printStackTrace();
            }
         }
      });
   }

}

When extending ServerHttpResponseDecorator we need to override writeWith method responsible for writing body to output reactive stream. We will listen for body writing events in doOnNext. Then we access DataBuffer and buffer it in ByteArrayOutputStream.

public class ResponseLoggingInterceptor extends ServerHttpResponseDecorator {

   private static final Logger LOGGER = LoggerFactory.getLogger(ResponseLoggingInterceptor.class);

   private long startTime;
   private boolean logHeaders;

   public ResponseLoggingInterceptor(ServerHttpResponse delegate, long startTime, boolean logHeaders) {
      super(delegate);
      this.startTime = startTime;
      this.logHeaders = logHeaders;
   }

   @Override
   public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
      Flux<DataBuffer> buffer = Flux.from(body);
      return super.writeWith(buffer.doOnNext(dataBuffer -> {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         try {
            Channels.newChannel(baos).write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
            String bodyRes = IOUtils.toString(baos.toByteArray(), "UTF-8");
            if (logHeaders)
               LOGGER.info("Response({} ms): status={}, payload={}, audit={}", value("X-Response-Time", System.currentTimeMillis() - startTime),
                     value("X-Response-Status", getStatusCode().value()), bodyRes, value("audit", true));
            else
               LOGGER.info("Response({} ms): status={}, payload={}, audit={}", value("X-Response-Time", System.currentTimeMillis() - startTime),
                     value("X-Response-Status", getStatusCode().value()), bodyRes, value("audit", true));
         } catch (IOException e) {
            e.printStackTrace();
         } finally {
            try {
               baos.close();
            } catch (IOException e) {
               e.printStackTrace();
            }
         }
      }));
   }
}

Spring WebFlux Logging Filter

To be able to decorate requests and responses we first need to declare filter intercepting an incoming request. To do that we have to declare a bean that implements WebFilter and its method filter(...). The filtering method allows you to access the exchange object, which contains objects representing request and response. So if we would like to decorate request and response objects we first need to decorate ServerWebExchange. We may easily do it by defining an instance of  the ServerWebExchangeDecorator object with overridden methods getRequest and getResponse. Our decorators are responsible just for listening to events related to message body processing. So, the significant information here is that if a message has an empty body, the listening methods won’t be fired. That’s why I decided to add a code for analyzing the length of content to log a request or response message with an empty body. It is based on the HTTP header Content-Length.

public class ReactiveSpringLoggingFilter implements WebFilter {

   private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveSpringLoggingFilter.class);
   private UniqueIDGenerator generator;
   private String ignorePatterns;
   private boolean logHeaders;
   private boolean useContentLength;

   public ReactiveSpringLoggingFilter(UniqueIDGenerator generator, String ignorePatterns, boolean logHeaders, boolean useContentLength) {
      this.generator = generator;
      this.ignorePatterns = ignorePatterns;
      this.logHeaders = logHeaders;
      this.useContentLength = useContentLength;
   }

   @Override
   public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
      if (ignorePatterns != null && exchange.getRequest().getURI().getPath().matches(ignorePatterns)) {
         return chain.filter(exchange);
      } else {
         generator.generateAndSetMDC(exchange.getRequest());
         final long startTime = System.currentTimeMillis();
         List<String> header = exchange.getRequest().getHeaders().get("Content-Length");
         if (useContentLength && (header == null || header.get(0).equals("0"))) {
            if (logHeaders)
               LOGGER.info("Request: method={}, uri={}, headers={}, audit={}", exchange.getRequest().getMethod(),
                     exchange.getRequest().getURI().getPath(), exchange.getRequest().getHeaders(), value("audit", true));
            else
               LOGGER.info("Request: method={}, uri={}, audit={}", exchange.getRequest().getMethod(),
                     exchange.getRequest().getURI().getPath(), value("audit", true));
         }
         ServerWebExchangeDecorator exchangeDecorator = new ServerWebExchangeDecorator(exchange) {
            @Override
            public ServerHttpRequest getRequest() {
               return new RequestLoggingInterceptor(super.getRequest(), logHeaders);
            }

            @Override
            public ServerHttpResponse getResponse() {
               return new ResponseLoggingInterceptor(super.getResponse(), startTime, logHeaders);
            }
         };
         return chain.filter(exchangeDecorator)
            .doOnSuccess(aVoid -> {
               logResponse(startTime, exchangeDecorator.getResponse(), exchangeDecorator.getResponse().getStatusCode().value());
            })
            .doOnError(throwable -> {
               logResponse(startTime, exchangeDecorator.getResponse(), 500);
            });
      }
   }
}

The last step of implementation is auto-configuration. Here’s the class responsible for it.

@Configuration
@ConfigurationProperties(prefix = "logging.logstash")
public class ReactiveSpringLoggingAutoConfiguration {

   private static final String LOGSTASH_APPENDER_NAME = "LOGSTASH";

   private String url = "localhost:8500";
   private String ignorePatterns;
   private boolean logHeaders;
   private boolean useContentLength;
   private String trustStoreLocation;
   private String trustStorePassword;
   @Value("${spring.application.name:-}")
   String name;

   @Bean
   public UniqueIDGenerator generator() {
      return new UniqueIDGenerator();
   }

   @Bean
   public ReactiveSpringLoggingFilter reactiveSpringLoggingFilter() {
      return new ReactiveSpringLoggingFilter(generator(), ignorePatterns, logHeaders, useContentLength);
   }

   @Bean
   @ConditionalOnProperty("logging.logstash.enabled")
   public LogstashTcpSocketAppender logstashAppender() {
      ...
   }
}

Usage of Spring WebFlux Logging

To be able to create reactive APIs with Spring Boot we first need to include Spring WebFlux starter to Maven dependencies.

<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>2.1.9.RELEASE</version>
   <relativePath/>
</parent>
<groupId>pl.piomin.test</groupId>
<artifactId>sample-webflux-app</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
   <java.version>11</java.version>
</properties>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
   <groupId>com.github.piomin</groupId>
   <artifactId>reactive-logstash-logging-spring-boot-starter</artifactId>
   <version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>

I have already described how to build microservices architecture with Spring WebFlux and Spring Cloud in one of my previous articles Reactive Microservices with Spring WebFlux and Spring Cloud. So for more information about advanced use cases, you can refer to this article. Here’s a typical controller implementation with Spring WebFlux Mono and Flux objects.

@RestController
public class AccountController {

   private static final Logger LOGGER = LoggerFactory.getLogger(AccountController.class);
   
   @Autowired
   private AccountRepository repository;

   @GetMapping("/customer/{customer}")
   public Flux<Account> findByCustomer(@PathVariable("customer") String customerId) {
      LOGGER.info("findByCustomer: customerId={}", customerId);
      return repository.findByCustomerId(customerId);
   }

   @GetMapping
   public Flux<Account> findAll() {
      LOGGER.info("findAll");
      return repository.findAll();
   }

   @GetMapping("/{id}")
   public Mono<Account> findById(@PathVariable("id") String id) {
      LOGGER.info("findById: id={}", id);
      return repository.findById(id);
   }

   @PostMapping
   public Mono<Account> create(@RequestBody Account account) {
      LOGGER.info("create: {}", account);
      return repository.save(account);
   }
   
}

Here are the log entries for GET (empty body) and POST requests.

reactive-logging

We can customize the library behaviour by overriding default values of configuration properties with logging.logstash.*. Here’s the typical configuration that enables embedded Logstash appender configuration, changes default Logstash URL, includes list of headers to the log and ignores logging of /favicon.ico requests.


logging.logstash:
  enabled: true
  url: 192.168.99.100:8500
  ignorePatterns: .*(favicon).*
  logHeaders: true

With the settings visible above the logs are sent to Logstash available on address 192.168.99.100:8500.

logstash-1

Summary

Spring Boot Logging library now supports logging for synchronous HTTP API with Spring MVC and reactive HTTP API with Spring WebFlux. The detailed description of the libraries configuration features may be found in my article Using logstash-logging-spring-boot-starter for logging with Spring Boot and Logstash. You can report the bugs or propose new enhancements here: https://github.com/piomin/spring-boot-logging/issues. Any feedback would be very welcome.

The post Reactive Logging With Spring WebFlux and Logstash appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2019/10/15/reactive-logging-with-spring-webflux-and-logstash/feed/ 12 7346
Reactive programming with Project Reactor https://piotrminkowski.com/2018/10/22/reactive-programming-with-project-reactor/ https://piotrminkowski.com/2018/10/22/reactive-programming-with-project-reactor/#respond Mon, 22 Oct 2018 14:51:15 +0000 https://piotrminkowski.wordpress.com/?p=6872 If you are building reactive microservices you would probably have to merge data streams from different source APIs into a single result stream. It inspired me to create this article containing some most common scenarios of using reactive streams in microservice-based architecture during inter-service communication. I have already described some aspects related to reactive programming […]

The post Reactive programming with Project Reactor appeared first on Piotr's TechBlog.

]]>
If you are building reactive microservices you would probably have to merge data streams from different source APIs into a single result stream. It inspired me to create this article containing some most common scenarios of using reactive streams in microservice-based architecture during inter-service communication.

I have already described some aspects related to reactive programming with Spring based on Spring WebFlux and Spring Data JDBC projects in the following articles:

Spring Framework supports reactive programming since version 5. That support is built on top of Project Reactor – https://projectreactor.io. Reactor is a fourth-generation Reactive programming library for building non-blocking applications on the JVM based on the Reactive Streams Specification. Working with this library can be difficult at first, especially if you don’t have any experience with reactive streams. Reactive Core gives us two data types that enable us to produce a stream of data: Mono and Flux. With Flux we can emit 0..nelements, while with Mono we can create a stream of 0..1elements. Both those types implement Publisher interface. Both these types are lazy, which means they won’t be executed until you consume it. Therefore, when building reactive APIs it is important not to block the stream. Spring WebFlux doesn’t allow that.

Introduction

The sample project is available on GitHub in repository reactive-playground https://github.com/piomin/reactive-playground.git. It is written in Kotlin. In addition to some Kotlin libraries, only a single dependency that needs to be added in order to use Project Reactor is reactor-core.

<dependency>
   <groupId>io.projectreactor</groupId>
   <artifactId>reactor-core</artifactId>
   <version>3.2.1.RELEASE</version>
</dependency>

I would not like to show you the features of Project Reactor based on simple String objects like in many other articles. Therefore, I have created the following class hierarchy for our tests, that allows us to simulate APIs built for three different domain objects.

reactive-programming-4

Class Organization contains a list of Employee and Department. Each department contains a list of Employee assigned only to the given department inside the organization. Class Employee has properties: organizationId that assigns it to the organization and departmentId that assigns it to the department.

data class Employee(var id: Int, var name: String, var salary: Int) {
    var organizationId: Int? = null
    var departmentId: Int? = null

    constructor(id: Int, name: String, salary: Int, organizationId: Int, departmentId: Int) : this(id, name, salary) {
        this.organizationId = organizationId
        this.departmentId = departmentId
    }

    constructor(id: Int, name: String, salary: Int, organizationId: Int) : this(id, name, salary) {
        this.organizationId = organizationId
    }
}

Here’s the implementation of Department class.

class Department(var id: Int, var name: String, var organizationId: Int) {
    var employees: MutableList<Employee> = ArrayList()

    constructor(id: Int, name: String, organizationId: Int, employees: MutableList<Employee>) : this(id, name, organizationId) {
        this.employees.addAll(employees)
    }

    fun addEmployees(employees: MutableList<Employee>) : Department {
        this.employees.addAll(employees)
        return this
    }

    fun addEmployee(employee: Employee) : Department {
        this.employees.add(employee)
        return this
    }

}

Here’s the implementation of Organization class.

class Organization(var id: Int, var name: String) {
    var employees: MutableList<Employee> = ArrayList()
    var departments: MutableList<Department> = ArrayList()

    constructor(id: Int, name: String, employees: MutableList<Employee>, departments: MutableList<Department>) : this(id, name){
        this.employees.addAll(employees)
        this.departments.addAll(departments)
    }

    constructor(id: Int, name: String, employees: MutableList<Employee>) : this(id, name){
        this.employees.addAll(employees)
    }
}

Scenario 1

We have API methods that return data streams. First of them return Flux emitting employees assigned to the given organization. Second of them just returns Mono with the current organization.

private fun getOrganizationByName(name: String) : Mono<Organization> {
   return Mono.just(Organization(1, name))
}

private fun getEmployeesByOrganization(id: Int) : Flux<Employee> {
   return Flux.just(Employee(1, "Employee1", 1000, id),
                Employee(2, "Employee2", 2000, id))
}

We would like to return the single stream emitting organization that contains a list of employees as shown below.

reactor-scenario-1

Here’s the solution. We use the zipWhen method that waits for the result from source Mono, and then calls the second Mono. Because we can zip only the same stream types (in that case these are Mono) we need to convert Flux<Employee> returned by getEmployeesByOrganization method into Mono<MutableList<Employee>> using collectList function. Thanks to zipWhen we can then combine two Mono streams and create new objects inside map function.

@Test
fun testScenario1() {
   val organization : Mono<Organization> = getOrganizationByName("test")
      .zipWhen { organization ->
         getEmployeesByOrganization(organization.id!!).collectList()
      }
      .map { tuple -> 
         Organization(tuple.t1.id, tuple.t1.name, tuple.t2)
      }
}

Scenario 2

Let’s consider another scenario. Now, we have Flux streams that emit employees and departments. Every employee has property departmentId responsible for assignment to the department.

private fun getDepartments() : Flux<Department> {
    return Flux.just(Department(1, "X", 1),
                     Department(2, "Y", 1))
}

private fun getEmployees() : Flux<Employee> {
    return Flux.just(Employee(1, "Employee1", 1000, 1, 1),
            Employee(2, "Employee2", 2000, 1, 1),
            Employee(3, "Employee3", 1000, 1, 2),
            Employee(4, "Employee4", 2000, 1, 2))
}

The goal is to merge those two streams and return the single Flux stream emitting departments that contains all employees assigned to the given department. Here’s the picture that illustrates the transformation described above.

reactive-programming-reactor-5

We can do that in two ways as shown below. First calls flatMap function on stream with departments. Inside flatMap we zip every single Department with a stream of employees. That stream is then filtered by departmentId and converted into Mono type. Finally, we are creating a Mono type using map function that emits a department containing a list of employees.
The second way groups Flux with employees by departmentId. Then it invokes zipping and mapping functions similar to the previous way.

@Test
fun testScenario2() {
   val departments: Flux<Department> = getDepartments()
      .flatMap { department ->
         Mono.just(department)
            .zipWith(getEmployees().filter { it.departmentId == department.id }.collectList())
            .map { t -> t.t1.addEmployees(t.t2) }
      }

   val departments2: Flux<Department> = getEmployees()
      .groupBy { it.departmentId }
      .flatMap { t -> getDepartments().filter { it.id == t.key() }.elementAt(0)
         .zipWith(t.collectList())
         .map { it.t1.addEmployees(it.t2) }
      }
}

Scenario 3

This scenario is simpler than two previous scenarios. We have two API methods that emit Flux with the same object types. First of them contains list of employees having id, name, salary properties, while the second id, organizationId, departmentId properties.

private fun getEmployeesBasic() : Flux<Employee> {
   return Flux.just(Employee(1, "AA", 1000),
                        Employee(2, "BB", 2000))
}

private fun getEmployeesRelationships() : Flux<Employee> {
   return Flux.just(Employee(1, 1, 1),
              Employee(2, 1, 2))
}

We want to convert it into a single stream emitting employees with a full set of properties. The following picture illustrates the described transformation.

reactive-programming-reactor-scenario-3

In that case the solution is pretty simple. We are zipping two Flux streams using zipWith function, and then map two zipped objects into a single containing the full set of properties.

@Test
fun testScenario3() {
   val employees : Flux<Employee> = getEmployeesBasic()
      .zipWith(getEmployeesRelationships())
      .map { t -> Employee(t.t1.id, t.t1.name, t.t1.salary, t.t2.organizationId!!, t.t2.departmentId!!) }
}

Scenario 4

In this scenario we have two independent Flux streams that emit the same type of objects – Employee.

private fun getEmployeesFirstPart() : Flux<Employee> {
   return Flux.just(Employee(1, "AA", 1000), Employee(3, "BB", 3000))
}

private fun getEmployeesSecondPart() : Flux<Employee> {
   return Flux.just(Employee(2, "CC", 2000), Employee(4, "DD", 4000))
}

We would like to merge those two streams into a single stream ordered by id. The following picture shows that transformation.

reactor-scenario-4

Here’s the solution. We use mergeOrderedWith function with a comparator that compares id. Then we can perform some transformations on every object, but it is only an option that shows the usage on map function.

@Test
fun testScenario4() {
   val persons: Flux<Employee> = getEmployeesFirstPart()
      .mergeOrderedWith(getEmployeesSecondPart(), Comparator { o1, o2 -> o1.id.compareTo(o2.id) })
      .map {
         Employee(it.id, it.name, it.salary, 1, 1)
      }
}

Scenario 5

And the last scenario in this article. We have a single input stream Mono with Organization that contains a list of departments. Each of department inside that list also contains the list of all employees assigned to the given department. Here’s our API method implementation.

private fun getDepartmentsByOrganization(id: Int) : Flux<Department> {
   val dep1 = Department(1, "A", id, mutableListOf(
         Employee(1, "Employee1", 1000, id, 1),
         Employee(2, "Employee2", 2000, id, 1)
      )
   )
   val dep2 = Department(2, "B", id, mutableListOf(
         Employee(3, "Employee3", 1000, id, 2),
         Employee(4, "Employee4", 2000, id, 2)
      )
   )
   return Flux.just(dep1, dep2)
}

The goal is to convert the stream to the same stream Flux with Department, but containing a list of all employees in the department. The following picture visualizes the described transformation.

reactor-scenario-5

Here’s the solution. We invoke flatMapIterable function that converts Flux with Department> into Flux with Employees by returning List of Employee. Then we convert it to Mono and add to the newly created Organization object inside map function.

@Test
fun testScenario5() {
   var organization: Mono<Organization> = getDepartmentsByOrganization(1)
      .flatMapIterable { department -> department.employees }
      .collectList()
      .map { t -> Organization(1, "X", t) }
}

The post Reactive programming with Project Reactor appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/10/22/reactive-programming-with-project-reactor/feed/ 0 6872
Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux https://piotrminkowski.com/2018/10/18/introduction-to-reactive-apis-with-postgres-r2dbc-spring-data-jdbc-and-spring-webflux/ https://piotrminkowski.com/2018/10/18/introduction-to-reactive-apis-with-postgres-r2dbc-spring-data-jdbc-and-spring-webflux/#comments Thu, 18 Oct 2018 07:42:10 +0000 https://piotrminkowski.wordpress.com/?p=6863 There are pretty many technologies listed in the title of this article. Spring WebFlux has been introduced with Spring 5 and Spring Boot 2 as a project for building reactive-stack web applications. I have already described how to use it together with Spring Boot and Spring Cloud for building reactive microservices in that article: Reactive […]

The post Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux appeared first on Piotr's TechBlog.

]]>
There are pretty many technologies listed in the title of this article. Spring WebFlux has been introduced with Spring 5 and Spring Boot 2 as a project for building reactive-stack web applications. I have already described how to use it together with Spring Boot and Spring Cloud for building reactive microservices in that article: Reactive Microservices with Spring WebFlux and Spring Cloud. Spring 5 has also introduced some projects supporting reactive access to NoSQL databases like Cassandra, MongoDB or Couchbase. But there were still a lack in support for reactive to access to relational databases. The change is coming together with R2DBC (Reactive Relational Database Connectivity) project. That project is also being developed by Pivotal members. It seems to be very interesting initiative, however it is rather at the beginning of the road. Anyway, there is a module for integration with Postgres, and we will use it for our demo application. R2DBC will not be the only one new interesting solution described in this article. I also show you how to use Spring Data JDBC – another really interesting project released recently.
It is worth mentioning some words about Spring Data JDBC. This project has been already released, and is available under version 1.0. It is a part of a bigger Spring Data framework. It offers a repository abstraction based on JDBC. The main reason for creating that library is to allow access to relational databases using Spring Data way (through CrudRepository interfaces) without including JPA library to the application dependencies. Of course, JPA is still certainly the main persistence API used for Java applications. Spring Data JDBC aims to be much simpler conceptually than JPA by not implementing popular patterns like lazy loading, caching, dirty context, sessions. It also provides only very limited support for annotation-based mapping. Finally, it provides an implementation of reactive repositories that uses R2DBC for accessing a relational database. Although that module is still under development (only SNAPSHOT version is available), we will try to use it in our demo application. Let’s proceed to the implementation.

Including dependencies

We use Kotlin for implementation. So first, we include some required Kotlin dependencies.

<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-stdlib</artifactId>
   <version>${kotlin.version}</version>
</dependency>
<dependency>
   <groupId>com.fasterxml.jackson.module</groupId>
   <artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-test-junit</artifactId>
   <version>${kotlin.version}</version>
   <scope>test</scope>
</dependency>

We should also add kotlin-maven-plugin with support for Spring.


<plugin>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-maven-plugin</artifactId>
   <version>${kotlin.version}</version>
   <executions>
      <execution>
         <id>compile</id>
         <phase>compile</phase>
         <goals>
            <goal>compile</goal>
         </goals>
      </execution>
      <execution>
         <id>test-compile</id>
         <phase>test-compile</phase>
         <goals>
            <goal>test-compile</goal>
         </goals>
      </execution>
   </executions>
   <configuration>
      <args>
         <arg>-Xjsr305=strict</arg>
      </args>
      <compilerPlugins>
         <plugin>spring</plugin>
      </compilerPlugins>
   </configuration>
</plugin>

Then, we may proceed to including frameworks required for the demo implementation. We need to include the special SNAPSHOT version of Spring Data JDBC dedicated for accessing databases using R2DBC. We also have to add some R2DBC libraries and Spring WebFlux. As you may see below only Spring WebFlux is available in stable version (as a part of Spring Boot RELEASE).

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.data</groupId>
   <artifactId>spring-data-jdbc</artifactId>
   <version>1.0.0.r2dbc-SNAPSHOT</version>
</dependency>
<dependency>
   <groupId>io.r2dbc</groupId>
   <artifactId>r2dbc-spi</artifactId>
   <version>1.0.0.M5</version>
</dependency>
<dependency>
   <groupId>io.r2dbc</groupId>
   <artifactId>r2dbc-postgresql</artifactId>
   <version>1.0.0.M5</version>
</dependency>

It is also important to set dependency management for Spring Data project.


<dependencyManagement>
   <dependencies>
      <dependency>
         <groupId>org.springframework.data</groupId>
         <artifactId>spring-data-releasetrain</artifactId>
         <version>Lovelace-RELEASE</version>
         <scope>import</scope>
         <type>pom</type>
      </dependency>
   </dependencies>
</dependencyManagement>

Repositories

We are using the well known Spring Data style of CRUD repository implementation. In that case we need to create an interface that extends ReactiveCrudRepository interface.
Here’s the implementation of a repository for managing Employee objects.

interface EmployeeRepository : ReactiveCrudRepository<Employee, Int> {
    @Query("select id, name, salary, organization_id from employee e where e.organization_id = $1")
    fun findByOrganizationId(organizationId: Int) : Flux<Employee>
}

Here’s another implementation of repository – this time for managing Organization objects.

interface OrganizationRepository : ReactiveCrudRepository<Organization, Int> 

Implementing Entities and DTOs

Kotlin provides a convenient way of creating entity class by declaring them as data class. When using Spring Data JDBC we have to set the primary key for the entity by annotating the field with @Id. It assumes the key is automatically incremented by the database. If you are not using auto-increment columns, you have to use a BeforeSaveEvent listener, which sets the ID of the entity. However, I tried to set such a listener for my entity, but it just didn’t work with the reactive version of Spring Data JDBC.
Here’s an implementation of Employee entity class. What is worth mentioning Spring Data JDBC will automatically map class field organizationId into database column organization_id.

data class Employee(val name: String, val salary: Int, val organizationId: Int) {
    @Id 
    var id: Int? = null
}

Here’s an implementation of Organization entity class.

data class Organization(var name: String) {
    @Id 
    var id: Int? = null
}

R2DBC does not support any lists or sets. Because I’d like to return a list with employees inside Organization object in one of API endpoints I have created a DTO containing such a list as shown below.

data class OrganizationDTO(var id: Int?, var name: String) {
    var employees : MutableList = ArrayList()
    constructor(employees: MutableList) : this(null, "") {
        this.employees = employees
    }
}

The SQL scripts corresponding to the created entities are visible below. Field type serial will automatically create a sequence and attach it to the field id.

CREATE TABLE employee (
    name character varying NOT NULL,
    salary integer NOT NULL,
    id serial PRIMARY KEY,
    organization_id integer
);
CREATE TABLE organization (
    name character varying NOT NULL,
    id serial PRIMARY KEY
);

Building sample web applications

For the demo purposes we will build two independent applications employee-service and organization-service. Application organization-service is communicating with employee-service using WebFlux WebClient. It gets the list of employees assigned to the organization, and includes them to response together with Organization object. Sample applications source code is available on GitHub under repository sample-spring-data-webflux: https://github.com/piomin/sample-spring-data-webflux.
Ok, let’s begin from declaring Spring Boot main class. We need to enable Spring Data JDBC repositories by annotating the main class with @EnableJdbcRepositories.

@SpringBootApplication
@EnableJdbcRepositories
class EmployeeApplication

fun main(args: Array<String>) {
    runApplication<EmployeeApplication>(*args)
}

Working with R2DBC and Postgres requires some configuration. Probably due to an early stage of progress in development of Spring Data JDBC and R2DBC there is no Spring Boot auto-configuration for Postgres. We need to declare connection factory, client, and repository inside @Configuration bean.

@Configuration
class EmployeeConfiguration {

    @Bean
    fun repository(factory: R2dbcRepositoryFactory): EmployeeRepository {
        return factory.getRepository(EmployeeRepository::class.java)
    }

    @Bean
    fun factory(client: DatabaseClient): R2dbcRepositoryFactory {
        val context = RelationalMappingContext()
        context.afterPropertiesSet()
        return R2dbcRepositoryFactory(client, context)
    }

    @Bean
    fun databaseClient(factory: ConnectionFactory): DatabaseClient {
        return DatabaseClient.builder().connectionFactory(factory).build()
    }

    @Bean
    fun connectionFactory(): PostgresqlConnectionFactory {
        val config = PostgresqlConnectionConfiguration.builder() //
                .host("192.168.99.100") //
                .port(5432) //
                .database("reactive") //
                .username("reactive") //
                .password("reactive123") //
                .build()

        return PostgresqlConnectionFactory(config)
    }

}

Finally, we can create REST controllers that contain the definition of our reactive API methods. With Kotlin it does not take much space. The following controller definition contains three GET methods that allows to find all employees, all employees assigned to a given organization or a single employee by id.

@RestController
@RequestMapping("/employees")
class EmployeeController {

    @Autowired
    lateinit var repository : EmployeeRepository

    @GetMapping
    fun findAll() : Flux<Employee> = repository.findAll()

    @GetMapping("/{id}")
    fun findById(@PathVariable id : Int) : Mono<Employee> = repository.findById(id)

    @GetMapping("/organization/{organizationId}")
    fun findByorganizationId(@PathVariable organizationId : Int) : Flux<Employee> = repository.findByOrganizationId(organizationId)

    @PostMapping
    fun add(@RequestBody employee: Employee) : Mono<Employee> = repository.save(employee)

}

Inter-service Communication

For the OrganizationController the implementation is a little bit more complicated. Because organization-service is communicating with employee-service, we first need to declare reactive WebFlux WebClient builder.

@Bean
fun clientBuilder() : WebClient.Builder {
   return WebClient.builder()
}

Then, similar to the repository bean the builder is being injected into the controller. It is used inside findByIdWithEmployees method for calling method GET /employees/organization/{organizationId} exposed by employee-service. As you can see on the code fragment below it provides a reactive API and returns Flux object containing a list of found employees. This list is injected into OrganizationDTO object using zipWith Reactor method.

@RestController
@RequestMapping("/organizations")
class OrganizationController {

    @Autowired
    lateinit var repository : OrganizationRepository
    @Autowired
    lateinit var clientBuilder : WebClient.Builder

    @GetMapping
    fun findAll() : Flux<Organization> = repository.findAll()

    @GetMapping("/{id}")
    fun findById(@PathVariable id : Int) : Mono<Organization> = repository.findById(id)

    @GetMapping("/{id}/withEmployees")
    fun findByIdWithEmployees(@PathVariable id : Int) : Mono<OrganizationDTO> {
        val employees : Flux<Employee> = clientBuilder.build().get().uri("http://localhost:8090/employees/organization/$id")
                .retrieve().bodyToFlux(Employee::class.java)
        val org : Mono = repository.findById(id)
        return org.zipWith(employees.collectList())
                .map { tuple -> OrganizationDTO(tuple.t1.id as Int, tuple.t1.name, tuple.t2) }
    }

    @PostMapping
    fun add(@RequestBody employee: Organization) : Mono<Organization> = repository.save(employee)

}

How it works?

Before running the tests we need to start a Postgres database. Here’s the Docker command used for running a Postgres container. It is creating a user with a password, and setting up a default database.

$ docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=reactive -e POSTGRES_PASSWORD=reactive123 -e POSTGRES_DB=reactive postgres

Then we need to create some test tables, so you have to run a SQL script placed in the section Implementing Entities and DTOs. After that you can start our test applications. If you do not override default settings provided inside application.yml files employee-service is listening on port 8090, and organization-service on port 8095. The following picture illustrates the architecture of our sample system.
spring-data-1
Now, let’s add some test data using the reactive API exposed by the applications.

$ curl -d '{"name":"Test1"}' -H "Content-Type: application/json" -X POST http://localhost:8095/organizations
$ curl -d '{"name":"Name1", "balance":5000, "organizationId":1}' -H "Content-Type: application/json" -X POST http://localhost:8090/employees
$ curl -d '{"name":"Name2", "balance":10000, "organizationId":1}' -H "Content-Type: application/json" -X POST http://localhost:8090/employees

Finally you can call GET organizations/{id}/withEmployees method, for example using your web browser. The result should be similar to the result visible on the following picture.

spring-data-2

The post Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/10/18/introduction-to-reactive-apis-with-postgres-r2dbc-spring-data-jdbc-and-spring-webflux/feed/ 3 6863
Reactive Microservices with Spring WebFlux and Spring Cloud https://piotrminkowski.com/2018/05/04/reactive-microservices-with-spring-webflux-and-spring-cloud/ https://piotrminkowski.com/2018/05/04/reactive-microservices-with-spring-webflux-and-spring-cloud/#comments Fri, 04 May 2018 09:32:45 +0000 https://piotrminkowski.wordpress.com/?p=6475 I have already described Spring reactive support about one year ago in the article Reactive microservices with Spring 5. At that time the project Spring WebFlux was under active development. Now after the official release of Spring 5 it is worth to take a look at the current version of it. Moreover, we will try […]

The post Reactive Microservices with Spring WebFlux and Spring Cloud appeared first on Piotr's TechBlog.

]]>
I have already described Spring reactive support about one year ago in the article Reactive microservices with Spring 5. At that time the project Spring WebFlux was under active development. Now after the official release of Spring 5 it is worth to take a look at the current version of it. Moreover, we will try to put our reactive microservices inside Spring Cloud ecosystem, which contains such the elements like service discovery with Eureka, load balancing with Spring Cloud Commons @LoadBalanced, and API gateway using Spring Cloud Gateway (also based on WebFlux and Netty). We will also check out Spring reactive support for NoSQL databases by the example of Spring Data Reactive Mongo project.

Here’s the figure that illustrates an architecture of our sample system consisting of two microservices, discovery server, gateway and MongoDB databases. The source code is as usual available on GitHub in sample-spring-cloud-webflux repository.

reactive-1

Let’s describe the further steps on the way to create the system illustrated above.

Step 1. Building reactive application using Spring WebFlux

To enable library Spring WebFlux for the project we should include starter spring-boot-starter-webflux to the dependencies. It includes some dependent libraries like Reactor or Netty server.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

REST controller looks pretty similar to the controller defined for synchronous web services. The only difference is in type of returned objects. Instead of a single object we return an instance of class Mono, and instead of list we return instance of class Flux. Thanks to Spring Data Reactive Mongo we don’t have to do anything more that call the needed method on the repository bean.


@RestController
public class AccountController {

   private static final Logger LOGGER = LoggerFactory.getLogger(AccountController.class);

   @Autowired
   private AccountRepository repository;

   @GetMapping("/customer/{customer}")
   public Flux findByCustomer(@PathVariable("customer") String customerId) {
      LOGGER.info("findByCustomer: customerId={}", customerId);
      return repository.findByCustomerId(customerId);
   }

   @GetMapping
   public Flux findAll() {
      LOGGER.info("findAll");
      return repository.findAll();
   }

   @GetMapping("/{id}")
   public Mono findById(@PathVariable("id") String id) {
   LOGGER.info("findById: id={}", id);
      return repository.findById(id);
   }

   @PostMapping
   public Mono create(@RequestBody Account account) {
      LOGGER.info("create: {}", account);
      return repository.save(account);
   }

}

Step 2. Integrate an application with database using Spring Data Reactive Mongo

The implementation of integration between application and database is also very simple. First, we need to include starter spring-boot-starter-data-mongodb-reactive to the project dependencies.

<dependency>
  <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

The support for reactive Mongo repositories is automatically enabled after including the starter. The next step is to declare entity with ORM mappings. The following class is also returned as reponse by AccountController.

@Document
public class Account {

   @Id
   private String id;
   private String number;
   private String customerId;
  private int amount;

   ...

}

Finally, we may create a repository interface that extends ReactiveCrudRepository. It follows the patterns implemented by Spring Data JPA and provides some basic methods for CRUD operations. It also allows you to define methods with names, which are automatically mapped to queries. The only difference in comparison with standard Spring Data JPA repositories is in method signatures. The objects are wrapped by Mono and Flux.

public interface AccountRepository extends ReactiveCrudRepository {

   Flux findByCustomerId(String customerId);

}

In this example I used Docker container for running MongoDB locally. Because I run Docker on Windows using Docker Toolkit the default address of Docker machine is 192.168.99.100. Here’s the configuration of data source in application.yml file.

spring:
  data:
    mongodb:
      uri: mongodb://192.168.99.100/test

Step 3. Enabling service discovery using Eureka

Integration with Spring Cloud Eureka is pretty the same as for synchronous REST microservices. To enable discovery client we should first include starter spring-cloud-starter-netflix-eureka-client to the project dependencies.

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

Then we have to enable it using @EnableDiscoveryClient annotation.

@SpringBootApplication
@EnableDiscoveryClient
public class AccountApplication {

   public static void main(String[] args) {
      SpringApplication.run(AccountApplication.class, args);
   }

}

Microservice will automatically register itself in Eureka. Of course, we may run more than one instance of every service. Here’s the screen illustrating Eureka Dashboard (http://localhost:8761) after running two instances of account-service and a single instance of customer-service.  I would not like to go into the details of running application with embedded Eureka server. You may refer to my previous article for details: Quick Guide to Microservices with Spring Boot 2.0, Eureka and Spring Cloud. Eureka server is available as discovery-service module.

spring-reactive

Step 4. Inter-service communication between reactive microservices with WebClient

An inter-service communication is realized by the WebClient from Spring WebFlux project. The same as for RestTemplate you should annotate it with Spring Cloud Commons @LoadBalanced . It enables integration with service discovery and load balancing using Netflix OSS Ribbon client. So, the first step is to declare a client builder bean with @LoadBalanced annotation.

@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
   return WebClient.builder();
}

Then we may inject WebClientBuilder into the REST controller. Communication with account-service is implemented inside GET /{id}/with-accounts , where first we are searching for customer entity using reactive Spring Data repository. It returns object Mono , while the WebClient returns Flux . Now, our main goal is to merge those to publishers and return single Mono object with the list of accounts taken from Flux without blocking the stream. The following fragment of code illustrates how I used WebClient to communicate with other microservice, and then merge the response and result from repository to single Mono object. This merge may probably be done in more “elegant” way, so feel free to create push request with your proposal.

@Autowired
private WebClient.Builder webClientBuilder;

@GetMapping("/{id}/with-accounts")
public Mono findByIdWithAccounts(@PathVariable("id") String id) {
   LOGGER.info("findByIdWithAccounts: id={}", id);
   Flux accounts = webClientBuilder.build().get().uri("http://account-service/customer/{customer}", id).retrieve().bodyToFlux(Account.class);
   return accounts
      .collectList()
      .map(a -> new Customer(a))
      .mergeWith(repository.findById(id))
      .collectList()
      .map(CustomerMapper::map);
}

Step 5. Building API gateway using Spring Cloud Gateway

Spring Cloud Gateway is one of the newest Spring Cloud projects. It is built on top of Spring WebFlux, and thanks to that we may use it as a gateway to our sample system based on reactive microservices with Spring Boot. Similar to Spring WebFlux applications it is run on an embedded Netty server. To enable it for the Spring Boot application just include the following dependency to your project.

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

We should also enable a discovery client in order to allow the gateway to fetch a list of registered microservices. However, there is no need to register a gateway application in Eureka. To disable registration you may set property eureka.client.registerWithEureka to false inside application.yml file.

@SpringBootApplication
@EnableDiscoveryClient
public class GatewayApplication {

   public static void main(String[] args) {
      SpringApplication.run(GatewayApplication.class, args);
   }

}

By default, Spring Cloud Gateway does not enable integration with service discovery. To enable it we should set property spring.cloud.gateway.discovery.locator.enabled to true. Now, the last thing that should be done is the configuration of the routes. Spring Cloud Gateway provides two types of components that may be configured inside routes: filters and predicates. Predicates are used for matching HTTP requests with the route, while filters can be used to modify requests and responses before or after sending the downstream request. Here’s the full configuration of gateway. It enables service discovery location, and defines two routes based on entries in service registry. We use the Path Route Predicate factory for matching the incoming requests, and the RewritePath GatewayFilter factory for modifying the requested path to adapt it to the format exposed by the downstream services (endpoints are exposed under path /, while gateway expose them under paths /account and /customer).

spring:
  cloud:
    gateway:
      discovery:
        locator:
          enabled: true
      routes:
      - id: account-service
        uri: lb://account-service
        predicates:
        - Path=/account/**
        filters:
        - RewritePath=/account/(?.*), /$\{path}
      - id: customer-service
        uri: lb://customer-service
        predicates:
        - Path=/customer/**
        filters:
        - RewritePath=/customer/(?.*), /$\{path}

Step 6. Testing reactive microservices with Spring Boot

Before making some tests let’s just recap our sample system. We have two microservices account-service, customer-service that use MongoDB as a database. Microservice customer-service calls endpoint GET /customer/{customer} exposed by account-service. The URL of account-service is taken from Eureka. The whole sample system is hidden behind the gateway, which is available under address localhost:8090.
Now, the first step is to run MongoDB on a Docker container. After executing the following command Mongo is available under address 192.168.99.100:27017.

$ docker run -d --name mongo -p 27017:27017 mongo

Then we may proceed to running discovery-service. Eureka is available under its default address localhost:8761. You may run it using your IDE or just by executing command java -jar target/discovery-service-1.0-SNAPHOT.jar. The same rule applies to our sample microservices. However, account-service needs to be multiplied in two instances, so you need to override default HTTP port when running second instance using -Dserver.port VM argument, for example java -jar -Dserver.port=2223 target/account-service-1.0-SNAPSHOT.jar. Finally, after running gateway-service we may add some test data.

$ curl --header "Content-Type: application/json" --request POST --data '{"firstName": "John","lastName": "Scott","age": 30}' http://localhost:8090/customer
{"id": "5aec1debfa656c0b38b952b4","firstName": "John","lastName": "Scott","age": 30,"accounts": null}
$ curl --header "Content-Type: application/json" --request POST --data '{"number": "1234567890","amount": 5000,"customerId": "5aec1debfa656c0b38b952b4"}' http://localhost:8090/account
{"id": "5aec1e86fa656c11d4c655fb","number": "1234567892","customerId": "5aec1debfa656c0b38b952b4","amount": 5000}
$ curl --header "Content-Type: application/json" --request POST --data '{"number": "1234567891","amount": 12000,"customerId": "5aec1debfa656c0b38b952b4"}' http://localhost:8090/account
{"id": "5aec1e91fa656c11d4c655fc","number": "1234567892","customerId": "5aec1debfa656c0b38b952b4","amount": 12000}
$ curl --header "Content-Type: application/json" --request POST --data '{"number": "1234567892","amount": 2000,"customerId": "5aec1debfa656c0b38b952b4"}' http://localhost:8090/account
{"id": "5aec1e99fa656c11d4c655fd","number": "1234567892","customerId": "5aec1debfa656c0b38b952b4","amount": 2000}

To test inter-service communication just call endpoint GET /customer/{id}/with-accounts on gateway-service. It forwards the request to customer-service, and then customer-service calls endpoint exposed by account-service using reactive WebClient. The result is visible below.

reactive-2

Conclusion

Since Spring 5 and Spring Boot 2.0 there is a full range of available ways to build microservices-based architecture. We can build standard synchronous system using one-to-one communication with Spring Cloud Netflix project, messaging microservices based on message broker and publish/subscribe communication model with Spring Cloud Stream, and finally asynchronous, reactive microservices with Spring WebFlux. The main goal of this article is to show you how to use Spring WebFlux together with Spring Cloud projects in order to provide such mechanisms like service discovery, load balancing or API gateway for reactive microservices built on top of Spring Boot. Before Spring 5 the lack of support for reactive microservices Spring Boot support was one of the drawbacks of Spring framework, but now with Spring WebFlux it is no longer the case. Not only that, we may leverage Spring reactive support for the most popular NoSQL databases like MongoDB or Cassandra, and easily place our reactive microservices inside one system together with synchronous REST microservices.

The post Reactive Microservices with Spring WebFlux and Spring Cloud appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/05/04/reactive-microservices-with-spring-webflux-and-spring-cloud/feed/ 4 6475
Reactive microservices with Spring 5 https://piotrminkowski.com/2017/02/16/reactive-microservices-with-spring-5/ https://piotrminkowski.com/2017/02/16/reactive-microservices-with-spring-5/#comments Thu, 16 Feb 2017 15:51:34 +0000 https://piotrminkowski.wordpress.com/?p=855 Spring team has announced support for reactive programming model from 5.0 release. New Spring version will probably be released on March. Fortunately, milestone and snapshot versions with these changes are now available on public spring repositories. There is new Spring Web Reactive project with support for reactive @Controller and also new WebClient with client-side reactive support. […]

The post Reactive microservices with Spring 5 appeared first on Piotr's TechBlog.

]]>
Spring team has announced support for reactive programming model from 5.0 release. New Spring version will probably be released on March. Fortunately, milestone and snapshot versions with these changes are now available on public spring repositories. There is new Spring Web Reactive project with support for reactive @Controller and also new WebClient with client-side reactive support. Today I’m going to take a closer look on solutions suggested by Spring team.

Following Spring WebFlux documentation  the Spring Framework uses Reactor internally for its own reactive support. Reactor is a Reactive Streams implementation that further extends the basic Reactive Streams Publisher contract with the Flux and Mono composable API types to provide declarative operations on data sequences of 0..N and 0..1. On the server-side Spring supports annotation based and functional programming models. Annotation model use @Controller and the other annotations supported also with Spring MVC. Reactive controller will be very similar to standard REST controller for synchronous services instead of it uses Flux, Mono and Publisher objects. Today I’m going to show you how to develop simple reactive microservices using annotation model and MongoDB reactive module. Sample application source code is available on GitHub.

For our example we need to use snapshots of Spring Boot 2.0.0 and Spring Web Reactive 0.1.0. Here are main pom.xml fragment and single microservice pom.xml below. In our microservices we use Netty instead of default Tomcat server.

[code language=”xml”]
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.BUILD-SNAPSHOT</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot.experimental</groupId>
<artifactId>spring-boot-dependencies-web-reactive</artifactId>
<version>0.1.0.BUILD-SNAPSHOT</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
[/code]

[code language=”xml”]
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot.experimental</groupId>
<artifactId>spring-boot-starter-web-reactive</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor.ipc</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>pl.piomin.services</groupId>
<artifactId>common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor.addons</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
[/code]

We have two microservices: account-service and customer-service. Each of them have its own MongoDB database and they are exposing simple reactive API for searching and saving data. Also customer-service interacting with account-service to get all customer accounts and return them in customer-service method. Here’s our account controller code.

[code language=”java”]
@RestController
public class AccountController {

@Autowired
private AccountRepository repository;

@GetMapping(value = "/account/customer/{customer}")
public Flux<Account> findByCustomer(@PathVariable("customer") Integer customerId) {
return repository.findByCustomerId(customerId)
.map(a -> new Account(a.getId(), a.getCustomerId(), a.getNumber(), a.getAmount()));
}

@GetMapping(value = "/account")
public Flux<Account> findAll() {
return repository.findAll().map(a -> new Account(a.getId(), a.getCustomerId(), a.getNumber(), a.getAmount()));
}

@GetMapping(value = "/account/{id}")
public Mono<Account> findById(@PathVariable("id") Integer id) {
return repository.findById(id)
.map(a -> new Account(a.getId(), a.getCustomerId(), a.getNumber(), a.getAmount()));
}

@PostMapping("/person")
public Mono<Account> create(@RequestBody Publisher<Account> accountStream) {
return repository
.save(Mono.from(accountStream)
.map(a -> new pl.piomin.services.account.model.Account(a.getNumber(), a.getCustomerId(),
a.getAmount())))
.map(a -> new Account(a.getId(), a.getCustomerId(), a.getNumber(), a.getAmount()));
}

}
[/code]

In all API methods we also perform mapping from Account entity (MongoDB @Document) to Account DTO available in our common module. Here’s account repository class. It uses ReactiveMongoTemplate for interacting with Mongo collections.

[code language=”java”]
@Repository
public class AccountRepository {

@Autowired
private ReactiveMongoTemplate template;

public Mono<Account> findById(Integer id) {
return template.findById(id, Account.class);
}

public Flux<Account> findAll() {
return template.findAll(Account.class);
}

public Flux<Account> findByCustomerId(String customerId) {
return template.find(query(where("customerId").is(customerId)), Account.class);
}

public Mono<Account> save(Mono<Account> account) {
return template.insert(account);
}

}
[/code]

In our Spring Boot main or @Configuration class we should declare spring beans for MongoDB with connection settings.

[code language=”java”]
@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

public @Bean MongoClient mongoClient() {
return MongoClients.create("mongodb://192.168.99.100");
}

public @Bean ReactiveMongoTemplate reactiveMongoTemplate() {
return new ReactiveMongoTemplate(mongoClient(), "account");
}

}
[/code]

I used docker MongoDB container for working on this sample.

docker run -d --name mongo -p 27017:27017 mongo

In customer service we call endpoint /account/customer/{customer} from account service. I declared @Bean WebClient in our main class.

[code language=”java”]
public @Bean WebClient webClient() {
return WebClient.builder().clientConnector(new ReactorClientHttpConnector()).baseUrl("http://localhost:2222").build();
}
[/code]

Here’s customer controller fragment. @Autowired WebClient calls account service after getting customer from MongoDB.

[code language=”java”]
@Autowired
private WebClient webClient;

@GetMapping(value = "/customer/accounts/{pesel}")
public Mono<Customer> findByPeselWithAccounts(@PathVariable("pesel") String pesel) {
return repository.findByPesel(pesel).flatMap(customer -> webClient.get().uri("/account/customer/{customer}", customer.getId()).accept(MediaType.APPLICATION_JSON)
.exchange().flatMap(response -> response.bodyToFlux(Account.class))).collectList().map(l -> {return new Customer(pesel, l);});
}
[/code]

We can test GET calls using web browser or REST clients. With POST it’s not so simple. Here are two simple test cases for adding new customer and getting customer with accounts. Test getCustomerAccounts need account service running on port 2222.

[code language=”java”]
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class CustomerTest {

private static final Logger logger = Logger.getLogger("CustomerTest");

private WebClient webClient;

@LocalServerPort
private int port;

@Before
public void setup() {
this.webClient = WebClient.create("http://localhost:" + this.port);
}

@Test
public void getCustomerAccounts() {
Customer customer = this.webClient.get().uri("/customer/accounts/234543647565")
.accept(MediaType.APPLICATION_JSON).exchange().then(response -> response.bodyToMono(Customer.class))
.block();
logger.info("Customer: " + customer);
}

@Test
public void addCustomer() {
Customer customer = new Customer(null, "Adam", "Kowalski", "123456787654");
customer = webClient.post().uri("/customer").accept(MediaType.APPLICATION_JSON)
.exchange(BodyInserters.fromObject(customer)).then(response -> response.bodyToMono(Customer.class))
.block();
logger.info("Customer: " + customer);
}

}
[/code]

Conclusion

Spring initiative with support for reactive programming seems promising, but now it’s on early stage of development. There is no availibility to use it with popular projects from Spring Cloud like Eureka, Ribbon or Hystrix. When I tried to add this dependencies to pom.xml my service failed to start. I hope that in the near future such functionalities like service discovery and load balancing will be available also for reactive microservices same as for synchronous REST microservices. Spring has also support for reactive model in Spring Cloud Stream project. It’s more stable than WebFlux framework. I’ll try use it in the future.

The post Reactive microservices with Spring 5 appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2017/02/16/reactive-microservices-with-spring-5/feed/ 2 855