reactive streams Archives - Piotr's TechBlog https://piotrminkowski.com/tag/reactive-streams/ Java, Spring, Kotlin, microservices, Kubernetes, containers Sat, 19 Dec 2020 14:40:35 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 reactive streams Archives - Piotr's TechBlog https://piotrminkowski.com/tag/reactive-streams/ 32 32 181738725 Using Reactive WebClient with Spring WebFlux https://piotrminkowski.com/2019/11/04/using-reactive-webclient-with-spring-webflux/ https://piotrminkowski.com/2019/11/04/using-reactive-webclient-with-spring-webflux/#comments Mon, 04 Nov 2019 10:12:52 +0000 https://piotrminkowski.wordpress.com/?p=7407 Reactive APIs and generally reactive programming become increasingly popular lately. You have a chance to observe more and more new frameworks and toolkits supporting reactive programming, or just dedicated for this. Today, in the era of microservices architecture, network communication through APIs becomes critical for applications. Therefore, reactive APIs seem to be an attractive alternative […]

The post Using Reactive WebClient with Spring WebFlux appeared first on Piotr's TechBlog.

]]>
Reactive APIs and generally reactive programming become increasingly popular lately. You have a chance to observe more and more new frameworks and toolkits supporting reactive programming, or just dedicated for this. Today, in the era of microservices architecture, network communication through APIs becomes critical for applications. Therefore, reactive APIs seem to be an attractive alternative to a traditional, synchronous approach. It should be definitely considered as a primary approach if you are working with large streams of data exchanged via network communication.

Spring supports reactive programming and reactive APIs too. You could have a chance to read about it in some of my previous articles. I focused on introducing that support. In the article Reactive Microservices with Spring WebFlux and Spring Cloud you can read more about building microservices architecture using Spring WebFlux and Spring Cloud. In turn, in the articles Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux and Reactive Elasticsearch with Spring Boot I have introduced reactive Spring Data repositories on an example of PostgreSQL and Elasticsearch. Those articles should be treated as an introduction to reactive programming with Spring. Today, I would like to go deeply into that topic and discuss some aspects related to the network communication between service that exposes reactive stream via API and service that consumes this API using Spring WebClient.

1. Access reactive stream using Spring WebClient

First, let’s consider the typical scenario of reading reactive API on the consumer side. We have the following implementation of reactive stream containing Person objects on the producer side:

@RestController
@RequestMapping("/persons")
public class PersonController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);

    @GetMapping("/json")
    public Flux<Person> findPersonsJson() {
        return Flux.fromStream(this::prepareStream)
                .doOnNext(person -> LOGGER.info("Server produces: {}", person));
    }
}

We can easily access it with non-blocking Spring WebFlux WebClient. The following test starts with a sample Spring WebFlux application, defines WebClient instance, and subscribes to the response stream.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class SampleSpringWebFluxTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleSpringWebFluxTest.class);
    final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();

    @Test
    public void testFindPersonsJson() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Flux<Person> persons = client.get().uri("/persons/json").retrieve().bodyToFlux(Person.class);
        persons.subscribe(person -> {
            waiter.assertNotNull(person);
            LOGGER.info("Client subscribes: {}", person);
            waiter.resume();
        });
        waiter.await(3000, 9);
    }
}

In reactive programming with Reactor and Spring WebFlux you first need to subscribe to the stream in order to be able to access emitted objects. Assuming that our test stream has 9 Person elements you will receive the following log output:

spring-webflux-webclient-1

Let’s think about what happened here. Our reactive stream on the server side has been returned as a JSON array response to the consumer. It means that our reactive client is able to start reading the stream only after completion of the elements emission on the server side. That’s not exactly what we wanted to achieve, because we would like to take advantage of reactive streams also on the client side, and be able to read every element on stream just after it has been emitted by the producer. But with application/json content type it is just not possible, what is perfectly seen on the fragment of response below.

spring-webflux-webclient-3

2. Expose event stream using Spring WebFlux

The problem defined in the previous section lies obviously on the server side. Spring WebFlux WebClient is able to read reactive streams continuously, but the producer needs to “properly” emit it. To achieve it we should set content type to application/json+stream or text/event-stream.

@RestController
@RequestMapping("/persons")
public class PersonController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);

    @GetMapping(value = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Person> findPersonsStream() {
        return Flux.fromStream(this::prepareStream).delaySequence(Duration.ofMillis(100))
                .doOnNext(person -> LOGGER.info("Server produces: {}", person));
    }
}

Now, our response looks slightly different than before as shown below.

spring-webflux-webclient-4

To see the effect during the test I delayed the whole stream a little (around 100 milliseconds), because it takes a time before subscribes start to receive elements from the stream after subscribing to it. The current test is the same as the previous test, we are subscribing to the response stream and printing all the elements.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class SampleSpringWebFluxTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleSpringWebFluxTest.class);
    final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();

    @Test
    public void testFindPersonsStream() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Flux<Person> persons = client.get().uri("/persons/stream").retrieve().bodyToFlux(Person.class);
        persons.subscribe(person -> {
            LOGGER.info("Client subscribes: {}", person);
            waiter.assertNotNull(person);
            waiter.resume();
        });
        waiter.await(3000, 9);
    }
}

Here are the results. As you see the elements are processed on the client-side just after being emitted by the producer. What is worth to note – all the elements are sending within the same thread.

webclient-2

3. Implementing backpressure

Backpressure is one of the most important reasons you would decide to use reactive programming. Following Spring WebFlux documentation it supports backpressure, since Project Reactor is a Reactive Streams library and, therefore, all of its operators support non-blocking back pressure. The whole sentence is of course conforming to the truth, but only on the server-side. Maybe the next fragment of documentation shall shed some light on things: Reactor has a strong focus on server-side Java.. We should remember that Spring WebClient and WebFlux uses TCP transport for communication between a client and the server. And therefore, a client is not able to regulate the frequency of elements emission on the server-side. I don’t want to go into the details right now, for the explanation of that situation you may refer to the following post https://stackoverflow.com/questions/52244808/backpressure-mechanism-in-spring-web-flux.
Ok, before proceeding let’s recap the definition of backpressure term. Backpressure (or backpressure) is resistance or force opposing the desired flow of data through software. In simple words, if a producer sends more events than a consumer is able to handle in a specific period of time, the consumer should be able to regulate the frequency of sending events on the producer side. Let’s consider the following test example.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class SampleSpringWebFluxTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleSpringWebFluxTest.class);
    final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();

    @Test
    public void testFindPersonsStreamBackPressure() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Flux<Person> persons = client.get().uri("/persons/stream/back-pressure").retrieve().bodyToFlux(Person.class);
        persons.map(this::doSomeSlowWork).subscribe(person -> {
            waiter.assertNotNull(person);
            LOGGER.info("Client subscribes: {}", person);
            waiter.resume();
        });
        waiter.await(3000, 9);
    }

    private Person doSomeSlowWork(Person person) {
        try {
            Thread.sleep(90);
        }
        catch (InterruptedException e) { }
        return person;
    }
}

After receiving a stream of elements our test calls a time-expensive mapping method on each element. So, it is not able to handle so many elements as has been sent by the producer. In this case, the only way to somehow “regulate” backpressure is through the delayElements method on the server-side. I also tried to use the limitRate method on the service side and implement my own custom Subscriber on the client-side, but I wasn’t successful. Here’s the current implementation of our API method for returning streams of Person objects.

@RestController
@RequestMapping("/persons")
public class PersonController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);

    @GetMapping(value = "/stream/back-pressure", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Person> findPersonsStreamBackPressure() {
        return Flux.fromStream(this::prepareStream).delayElements(Duration.ofMillis(100))
                .doOnNext(person -> LOGGER.info("Server produces: {}", person));
    }
}

After running the test we can see that every element is 100 milliseconds delayed, and also the producer uses the whole pool of threads for emitting a stream of objects.

webclient-3

Source Code

The source code of sample application and JUnit tests is as always available on GitHub. The repository address is https://github.com/piomin/sample-spring-webflux.git.

The post Using Reactive WebClient with Spring WebFlux appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2019/11/04/using-reactive-webclient-with-spring-webflux/feed/ 2 7407
Reactive programming with Project Reactor https://piotrminkowski.com/2018/10/22/reactive-programming-with-project-reactor/ https://piotrminkowski.com/2018/10/22/reactive-programming-with-project-reactor/#respond Mon, 22 Oct 2018 14:51:15 +0000 https://piotrminkowski.wordpress.com/?p=6872 If you are building reactive microservices you would probably have to merge data streams from different source APIs into a single result stream. It inspired me to create this article containing some most common scenarios of using reactive streams in microservice-based architecture during inter-service communication. I have already described some aspects related to reactive programming […]

The post Reactive programming with Project Reactor appeared first on Piotr's TechBlog.

]]>
If you are building reactive microservices you would probably have to merge data streams from different source APIs into a single result stream. It inspired me to create this article containing some most common scenarios of using reactive streams in microservice-based architecture during inter-service communication.

I have already described some aspects related to reactive programming with Spring based on Spring WebFlux and Spring Data JDBC projects in the following articles:

Spring Framework supports reactive programming since version 5. That support is built on top of Project Reactor – https://projectreactor.io. Reactor is a fourth-generation Reactive programming library for building non-blocking applications on the JVM based on the Reactive Streams Specification. Working with this library can be difficult at first, especially if you don’t have any experience with reactive streams. Reactive Core gives us two data types that enable us to produce a stream of data: Mono and Flux. With Flux we can emit 0..nelements, while with Mono we can create a stream of 0..1elements. Both those types implement Publisher interface. Both these types are lazy, which means they won’t be executed until you consume it. Therefore, when building reactive APIs it is important not to block the stream. Spring WebFlux doesn’t allow that.

Introduction

The sample project is available on GitHub in repository reactive-playground https://github.com/piomin/reactive-playground.git. It is written in Kotlin. In addition to some Kotlin libraries, only a single dependency that needs to be added in order to use Project Reactor is reactor-core.

<dependency>
   <groupId>io.projectreactor</groupId>
   <artifactId>reactor-core</artifactId>
   <version>3.2.1.RELEASE</version>
</dependency>

I would not like to show you the features of Project Reactor based on simple String objects like in many other articles. Therefore, I have created the following class hierarchy for our tests, that allows us to simulate APIs built for three different domain objects.

reactive-programming-4

Class Organization contains a list of Employee and Department. Each department contains a list of Employee assigned only to the given department inside the organization. Class Employee has properties: organizationId that assigns it to the organization and departmentId that assigns it to the department.

data class Employee(var id: Int, var name: String, var salary: Int) {
    var organizationId: Int? = null
    var departmentId: Int? = null

    constructor(id: Int, name: String, salary: Int, organizationId: Int, departmentId: Int) : this(id, name, salary) {
        this.organizationId = organizationId
        this.departmentId = departmentId
    }

    constructor(id: Int, name: String, salary: Int, organizationId: Int) : this(id, name, salary) {
        this.organizationId = organizationId
    }
}

Here’s the implementation of Department class.

class Department(var id: Int, var name: String, var organizationId: Int) {
    var employees: MutableList<Employee> = ArrayList()

    constructor(id: Int, name: String, organizationId: Int, employees: MutableList<Employee>) : this(id, name, organizationId) {
        this.employees.addAll(employees)
    }

    fun addEmployees(employees: MutableList<Employee>) : Department {
        this.employees.addAll(employees)
        return this
    }

    fun addEmployee(employee: Employee) : Department {
        this.employees.add(employee)
        return this
    }

}

Here’s the implementation of Organization class.

class Organization(var id: Int, var name: String) {
    var employees: MutableList<Employee> = ArrayList()
    var departments: MutableList<Department> = ArrayList()

    constructor(id: Int, name: String, employees: MutableList<Employee>, departments: MutableList<Department>) : this(id, name){
        this.employees.addAll(employees)
        this.departments.addAll(departments)
    }

    constructor(id: Int, name: String, employees: MutableList<Employee>) : this(id, name){
        this.employees.addAll(employees)
    }
}

Scenario 1

We have API methods that return data streams. First of them return Flux emitting employees assigned to the given organization. Second of them just returns Mono with the current organization.

private fun getOrganizationByName(name: String) : Mono<Organization> {
   return Mono.just(Organization(1, name))
}

private fun getEmployeesByOrganization(id: Int) : Flux<Employee> {
   return Flux.just(Employee(1, "Employee1", 1000, id),
                Employee(2, "Employee2", 2000, id))
}

We would like to return the single stream emitting organization that contains a list of employees as shown below.

reactor-scenario-1

Here’s the solution. We use the zipWhen method that waits for the result from source Mono, and then calls the second Mono. Because we can zip only the same stream types (in that case these are Mono) we need to convert Flux<Employee> returned by getEmployeesByOrganization method into Mono<MutableList<Employee>> using collectList function. Thanks to zipWhen we can then combine two Mono streams and create new objects inside map function.

@Test
fun testScenario1() {
   val organization : Mono<Organization> = getOrganizationByName("test")
      .zipWhen { organization ->
         getEmployeesByOrganization(organization.id!!).collectList()
      }
      .map { tuple -> 
         Organization(tuple.t1.id, tuple.t1.name, tuple.t2)
      }
}

Scenario 2

Let’s consider another scenario. Now, we have Flux streams that emit employees and departments. Every employee has property departmentId responsible for assignment to the department.

private fun getDepartments() : Flux<Department> {
    return Flux.just(Department(1, "X", 1),
                     Department(2, "Y", 1))
}

private fun getEmployees() : Flux<Employee> {
    return Flux.just(Employee(1, "Employee1", 1000, 1, 1),
            Employee(2, "Employee2", 2000, 1, 1),
            Employee(3, "Employee3", 1000, 1, 2),
            Employee(4, "Employee4", 2000, 1, 2))
}

The goal is to merge those two streams and return the single Flux stream emitting departments that contains all employees assigned to the given department. Here’s the picture that illustrates the transformation described above.

reactive-programming-reactor-5

We can do that in two ways as shown below. First calls flatMap function on stream with departments. Inside flatMap we zip every single Department with a stream of employees. That stream is then filtered by departmentId and converted into Mono type. Finally, we are creating a Mono type using map function that emits a department containing a list of employees.
The second way groups Flux with employees by departmentId. Then it invokes zipping and mapping functions similar to the previous way.

@Test
fun testScenario2() {
   val departments: Flux<Department> = getDepartments()
      .flatMap { department ->
         Mono.just(department)
            .zipWith(getEmployees().filter { it.departmentId == department.id }.collectList())
            .map { t -> t.t1.addEmployees(t.t2) }
      }

   val departments2: Flux<Department> = getEmployees()
      .groupBy { it.departmentId }
      .flatMap { t -> getDepartments().filter { it.id == t.key() }.elementAt(0)
         .zipWith(t.collectList())
         .map { it.t1.addEmployees(it.t2) }
      }
}

Scenario 3

This scenario is simpler than two previous scenarios. We have two API methods that emit Flux with the same object types. First of them contains list of employees having id, name, salary properties, while the second id, organizationId, departmentId properties.

private fun getEmployeesBasic() : Flux<Employee> {
   return Flux.just(Employee(1, "AA", 1000),
                        Employee(2, "BB", 2000))
}

private fun getEmployeesRelationships() : Flux<Employee> {
   return Flux.just(Employee(1, 1, 1),
              Employee(2, 1, 2))
}

We want to convert it into a single stream emitting employees with a full set of properties. The following picture illustrates the described transformation.

reactive-programming-reactor-scenario-3

In that case the solution is pretty simple. We are zipping two Flux streams using zipWith function, and then map two zipped objects into a single containing the full set of properties.

@Test
fun testScenario3() {
   val employees : Flux<Employee> = getEmployeesBasic()
      .zipWith(getEmployeesRelationships())
      .map { t -> Employee(t.t1.id, t.t1.name, t.t1.salary, t.t2.organizationId!!, t.t2.departmentId!!) }
}

Scenario 4

In this scenario we have two independent Flux streams that emit the same type of objects – Employee.

private fun getEmployeesFirstPart() : Flux<Employee> {
   return Flux.just(Employee(1, "AA", 1000), Employee(3, "BB", 3000))
}

private fun getEmployeesSecondPart() : Flux<Employee> {
   return Flux.just(Employee(2, "CC", 2000), Employee(4, "DD", 4000))
}

We would like to merge those two streams into a single stream ordered by id. The following picture shows that transformation.

reactor-scenario-4

Here’s the solution. We use mergeOrderedWith function with a comparator that compares id. Then we can perform some transformations on every object, but it is only an option that shows the usage on map function.

@Test
fun testScenario4() {
   val persons: Flux<Employee> = getEmployeesFirstPart()
      .mergeOrderedWith(getEmployeesSecondPart(), Comparator { o1, o2 -> o1.id.compareTo(o2.id) })
      .map {
         Employee(it.id, it.name, it.salary, 1, 1)
      }
}

Scenario 5

And the last scenario in this article. We have a single input stream Mono with Organization that contains a list of departments. Each of department inside that list also contains the list of all employees assigned to the given department. Here’s our API method implementation.

private fun getDepartmentsByOrganization(id: Int) : Flux<Department> {
   val dep1 = Department(1, "A", id, mutableListOf(
         Employee(1, "Employee1", 1000, id, 1),
         Employee(2, "Employee2", 2000, id, 1)
      )
   )
   val dep2 = Department(2, "B", id, mutableListOf(
         Employee(3, "Employee3", 1000, id, 2),
         Employee(4, "Employee4", 2000, id, 2)
      )
   )
   return Flux.just(dep1, dep2)
}

The goal is to convert the stream to the same stream Flux with Department, but containing a list of all employees in the department. The following picture visualizes the described transformation.

reactor-scenario-5

Here’s the solution. We invoke flatMapIterable function that converts Flux with Department> into Flux with Employees by returning List of Employee. Then we convert it to Mono and add to the newly created Organization object inside map function.

@Test
fun testScenario5() {
   var organization: Mono<Organization> = getDepartmentsByOrganization(1)
      .flatMapIterable { department -> department.employees }
      .collectList()
      .map { t -> Organization(1, "X", t) }
}

The post Reactive programming with Project Reactor appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/10/22/reactive-programming-with-project-reactor/feed/ 0 6872